Andy Babic 2024-04-27 12:39:02 +00:00 zatwierdzone przez GitHub
commit a27e8f075d
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
6 zmienionych plików z 516 dodań i 398 usunięć

Wyświetl plik

@ -75,398 +75,6 @@ class StreamBlockValidationError(ValidationError):
return result
class BaseStreamBlock(Block):
def __init__(self, local_blocks=None, search_index=True, **kwargs):
self._constructor_kwargs = kwargs
self.search_index = search_index
super().__init__(**kwargs)
# create a local (shallow) copy of base_blocks so that it can be supplemented by local_blocks
self.child_blocks = self.base_blocks.copy()
if local_blocks:
for name, block in local_blocks:
block.set_name(name)
self.child_blocks[name] = block
def empty_value(self, raw_text=None):
return StreamValue(self, [], raw_text=raw_text)
def sorted_child_blocks(self):
"""Child blocks, sorted in to their groups."""
return sorted(
self.child_blocks.values(), key=lambda child_block: child_block.meta.group
)
def grouped_child_blocks(self):
"""
The available child block types of this stream block, organised into groups according to
their meta.group attribute.
Returned as an iterable of (group_name, list_of_blocks) tuples
"""
return itertools.groupby(
self.sorted_child_blocks(), key=lambda child_block: child_block.meta.group
)
def value_from_datadict(self, data, files, prefix):
count = int(data["%s-count" % prefix])
values_with_indexes = []
for i in range(0, count):
if data["%s-%d-deleted" % (prefix, i)]:
continue
block_type_name = data["%s-%d-type" % (prefix, i)]
try:
child_block = self.child_blocks[block_type_name]
except KeyError:
continue
values_with_indexes.append(
(
int(data["%s-%d-order" % (prefix, i)]),
block_type_name,
child_block.value_from_datadict(
data, files, "%s-%d-value" % (prefix, i)
),
data.get("%s-%d-id" % (prefix, i)),
)
)
values_with_indexes.sort()
return StreamValue(
self,
[
(child_block_type_name, value, block_id)
for (
index,
child_block_type_name,
value,
block_id,
) in values_with_indexes
],
)
def value_omitted_from_data(self, data, files, prefix):
return ("%s-count" % prefix) not in data
@property
def required(self):
return self.meta.required
def clean(self, value):
cleaned_data = []
errors = {}
non_block_errors = ErrorList()
for i, child in enumerate(value): # child is a StreamChild instance
try:
cleaned_data.append(
(child.block.name, child.block.clean(child.value), child.id)
)
except ValidationError as e:
errors[i] = e
if self.meta.min_num is not None and self.meta.min_num > len(value):
non_block_errors.append(
ValidationError(
_("The minimum number of items is %(min_num)d")
% {"min_num": self.meta.min_num}
)
)
elif self.required and len(value) == 0:
non_block_errors.append(ValidationError(_("This field is required.")))
if self.meta.max_num is not None and self.meta.max_num < len(value):
non_block_errors.append(
ValidationError(
_("The maximum number of items is %(max_num)d")
% {"max_num": self.meta.max_num}
)
)
if self.meta.block_counts:
block_counts = defaultdict(int)
for item in value:
block_counts[item.block_type] += 1
for block_name, min_max in self.meta.block_counts.items():
block = self.child_blocks[block_name]
max_num = min_max.get("max_num", None)
min_num = min_max.get("min_num", None)
block_count = block_counts[block_name]
if min_num is not None and min_num > block_count:
non_block_errors.append(
ValidationError(
"{}: {}".format(
block.label,
_("The minimum number of items is %(min_num)d")
% {"min_num": min_num},
)
)
)
if max_num is not None and max_num < block_count:
non_block_errors.append(
ValidationError(
"{}: {}".format(
block.label,
_("The maximum number of items is %(max_num)d")
% {"max_num": max_num},
)
)
)
if errors or non_block_errors:
# The message here is arbitrary - outputting error messages is delegated to the child blocks,
# which only involves the 'params' list
raise StreamBlockValidationError(
block_errors=errors, non_block_errors=non_block_errors
)
return StreamValue(self, cleaned_data)
def to_python(self, value):
if isinstance(value, StreamValue):
return value
elif isinstance(value, str) and value:
try:
value = json.loads(value)
except ValueError:
# value is not valid JSON; most likely, this field was previously a
# rich text field before being migrated to StreamField, and the data
# was left intact in the migration. Return an empty stream instead
# (but keep the raw text available as an attribute, so that it can be
# used to migrate that data to StreamField)
return self.empty_value(raw_text=value)
if not value:
return self.empty_value()
# ensure value is a list and not some other kind of iterable
value = list(value)
if isinstance(value[0], dict):
# value is in JSONish representation - a dict with 'type' and 'value' keys.
# This is passed to StreamValue to be expanded lazily - but first we reject any unrecognised
# block types from the list
return StreamValue(
self,
[
child_data
for child_data in value
if child_data["type"] in self.child_blocks
],
is_lazy=True,
)
else:
# See if it looks like the standard non-smart representation of a
# StreamField value: a list of (block_name, value) tuples
try:
[None for (x, y) in value]
except (TypeError, ValueError) as exc:
# Give up trying to make sense of the value
raise TypeError(
f"Cannot handle {value!r} (type {type(value)!r}) as a value of a StreamBlock"
) from exc
# Test succeeded, so return as a StreamValue-ified version of that value
return StreamValue(
self,
[
(k, self.child_blocks[k].normalize(v))
for k, v in value
if k in self.child_blocks
],
)
def bulk_to_python(self, values):
# 'values' is a list of streams, each stream being a list of dicts with 'type', 'value' and
# optionally 'id'.
# We will iterate over these streams, constructing:
# 1) a set of per-child-block lists ('child_inputs'), to be sent to each child block's
# bulk_to_python method in turn (giving us 'child_outputs')
# 2) a 'block map' of each stream, telling us the type and id of each block and the index we
# need to look up in the corresponding child_outputs list to obtain its final value
child_inputs = defaultdict(list)
block_maps = []
for stream in values:
block_map = []
for block_dict in stream:
block_type = block_dict["type"]
if block_type not in self.child_blocks:
# skip any blocks with an unrecognised type
continue
child_input_list = child_inputs[block_type]
child_index = len(child_input_list)
child_input_list.append(block_dict["value"])
block_map.append((block_type, block_dict.get("id"), child_index))
block_maps.append(block_map)
# run each list in child_inputs through the relevant block's bulk_to_python
# to obtain child_outputs
child_outputs = {
block_type: self.child_blocks[block_type].bulk_to_python(child_input_list)
for block_type, child_input_list in child_inputs.items()
}
# for each stream, go through the block map, picking out the appropriately-indexed
# value from the relevant list in child_outputs
return [
StreamValue(
self,
[
(block_type, child_outputs[block_type][child_index], id)
for block_type, id, child_index in block_map
],
is_lazy=False,
)
for block_map in block_maps
]
def get_prep_value(self, value):
if not value:
# Falsy values (including None, empty string, empty list, and
# empty StreamValue) become an empty stream
return []
else:
# value is a StreamValue - delegate to its get_prep_value() method
# (which has special-case handling for lazy StreamValues to avoid useless
# round-trips to the full data representation and back)
return value.get_prep_value()
def normalize(self, value):
return self.to_python(value)
def get_form_state(self, value):
if not value:
return []
else:
return [
{
"type": child.block.name,
"value": child.block.get_form_state(child.value),
"id": child.id,
}
for child in value
]
def get_api_representation(self, value, context=None):
if value is None:
# treat None as identical to an empty stream
return []
return [
{
"type": child.block.name,
"value": child.block.get_api_representation(
child.value, context=context
),
"id": child.id,
}
for child in value # child is a StreamChild instance
]
def render_basic(self, value, context=None):
return format_html_join(
"\n",
'<div class="block-{1}">{0}</div>',
[(child.render(context=context), child.block_type) for child in value],
)
def get_searchable_content(self, value):
if not self.search_index:
return []
content = []
for child in value:
content.extend(child.block.get_searchable_content(child.value))
return content
def extract_references(self, value):
for child in value:
for (
model,
object_id,
model_path,
content_path,
) in child.block.extract_references(child.value):
model_path = (
f"{child.block_type}.{model_path}"
if model_path
else child.block_type
)
content_path = (
f"{child.id}.{content_path}" if content_path else child.id
)
yield model, object_id, model_path, content_path
def get_block_by_content_path(self, value, path_elements):
"""
Given a list of elements from a content path, retrieve the block at that path
as a BoundBlock object, or None if the path does not correspond to a valid block.
"""
if path_elements:
id, *remaining_elements = path_elements
for child in value:
if child.id == id:
return child.block.get_block_by_content_path(
child.value, remaining_elements
)
else:
# an empty path refers to the stream as a whole
return self.bind(value)
def deconstruct(self):
"""
Always deconstruct StreamBlock instances as if they were plain StreamBlocks with all of the
field definitions passed to the constructor - even if in reality this is a subclass of StreamBlock
with the fields defined declaratively, or some combination of the two.
This ensures that the field definitions get frozen into migrations, rather than leaving a reference
to a custom subclass in the user's models.py that may or may not stick around.
"""
path = "wagtail.blocks.StreamBlock"
args = [list(self.child_blocks.items())]
kwargs = self._constructor_kwargs
return (path, args, kwargs)
def check(self, **kwargs):
errors = super().check(**kwargs)
for name, child_block in self.child_blocks.items():
errors.extend(child_block.check(**kwargs))
errors.extend(child_block._check_name(**kwargs))
return errors
class Meta:
# No icon specified here, because that depends on the purpose that the
# block is being used for. Feel encouraged to specify an icon in your
# descendant block type
icon = "placeholder"
default = []
required = True
form_classname = None
min_num = None
max_num = None
block_counts = {}
collapsed = False
MUTABLE_META_ATTRIBUTES = [
"required",
"min_num",
"max_num",
"block_counts",
"collapsed",
]
class StreamBlock(BaseStreamBlock, metaclass=DeclarativeSubBlocksMetaclass):
pass
class StreamValue(MutableSequence):
"""
Custom type used to represent the value of a StreamBlock; behaves as a sequence of BoundBlocks
@ -801,6 +409,400 @@ class StreamValue(MutableSequence):
)
class BaseStreamBlock(Block):
def __init__(self, local_blocks=None, search_index=True, **kwargs):
self._constructor_kwargs = kwargs
self.search_index = search_index
super().__init__(**kwargs)
# create a local (shallow) copy of base_blocks so that it can be supplemented by local_blocks
self.child_blocks = self.base_blocks.copy()
if local_blocks:
for name, block in local_blocks:
block.set_name(name)
self.child_blocks[name] = block
def empty_value(self, raw_text=None):
return self.meta.value_class(self, [], raw_text=raw_text)
def sorted_child_blocks(self):
"""Child blocks, sorted in to their groups."""
return sorted(
self.child_blocks.values(), key=lambda child_block: child_block.meta.group
)
def grouped_child_blocks(self):
"""
The available child block types of this stream block, organised into groups according to
their meta.group attribute.
Returned as an iterable of (group_name, list_of_blocks) tuples
"""
return itertools.groupby(
self.sorted_child_blocks(), key=lambda child_block: child_block.meta.group
)
def value_from_datadict(self, data, files, prefix):
count = int(data["%s-count" % prefix])
values_with_indexes = []
for i in range(0, count):
if data["%s-%d-deleted" % (prefix, i)]:
continue
block_type_name = data["%s-%d-type" % (prefix, i)]
try:
child_block = self.child_blocks[block_type_name]
except KeyError:
continue
values_with_indexes.append(
(
int(data["%s-%d-order" % (prefix, i)]),
block_type_name,
child_block.value_from_datadict(
data, files, "%s-%d-value" % (prefix, i)
),
data.get("%s-%d-id" % (prefix, i)),
)
)
values_with_indexes.sort()
return self.meta.value_class(
self,
[
(child_block_type_name, value, block_id)
for (
index,
child_block_type_name,
value,
block_id,
) in values_with_indexes
],
)
def value_omitted_from_data(self, data, files, prefix):
return ("%s-count" % prefix) not in data
@property
def required(self):
return self.meta.required
def clean(self, value):
cleaned_data = []
errors = {}
non_block_errors = ErrorList()
for i, child in enumerate(value): # child is a StreamChild instance
try:
cleaned_data.append(
(child.block.name, child.block.clean(child.value), child.id)
)
except ValidationError as e:
errors[i] = e
if self.meta.min_num is not None and self.meta.min_num > len(value):
non_block_errors.append(
ValidationError(
_("The minimum number of items is %(min_num)d")
% {"min_num": self.meta.min_num}
)
)
elif self.required and len(value) == 0:
non_block_errors.append(ValidationError(_("This field is required.")))
if self.meta.max_num is not None and self.meta.max_num < len(value):
non_block_errors.append(
ValidationError(
_("The maximum number of items is %(max_num)d")
% {"max_num": self.meta.max_num}
)
)
if self.meta.block_counts:
block_counts = defaultdict(int)
for item in value:
block_counts[item.block_type] += 1
for block_name, min_max in self.meta.block_counts.items():
block = self.child_blocks[block_name]
max_num = min_max.get("max_num", None)
min_num = min_max.get("min_num", None)
block_count = block_counts[block_name]
if min_num is not None and min_num > block_count:
non_block_errors.append(
ValidationError(
"{}: {}".format(
block.label,
_("The minimum number of items is %(min_num)d")
% {"min_num": min_num},
)
)
)
if max_num is not None and max_num < block_count:
non_block_errors.append(
ValidationError(
"{}: {}".format(
block.label,
_("The maximum number of items is %(max_num)d")
% {"max_num": max_num},
)
)
)
if errors or non_block_errors:
# The message here is arbitrary - outputting error messages is delegated to the child blocks,
# which only involves the 'params' list
raise StreamBlockValidationError(
block_errors=errors, non_block_errors=non_block_errors
)
return self.meta.value_class(self, cleaned_data)
def to_python(self, value):
if isinstance(value, self.meta.value_class):
return value
elif isinstance(value, str) and value:
try:
value = json.loads(value)
except ValueError:
# value is not valid JSON; most likely, this field was previously a
# rich text field before being migrated to StreamField, and the data
# was left intact in the migration. Return an empty stream instead
# (but keep the raw text available as an attribute, so that it can be
# used to migrate that data to StreamField)
return self.empty_value(raw_text=value)
if not value:
return self.empty_value()
# ensure value is a list and not some other kind of iterable
value = list(value)
if isinstance(value[0], dict):
# value is in JSONish representation - a dict with 'type' and 'value' keys.
# This is passed to StreamValue to be expanded lazily - but first we reject any unrecognised
# block types from the list
return self.meta.value_class(
self,
[
child_data
for child_data in value
if child_data["type"] in self.child_blocks
],
is_lazy=True,
)
else:
# See if it looks like the standard non-smart representation of a
# StreamField value: a list of (block_name, value) tuples
try:
[None for (x, y) in value]
except (TypeError, ValueError) as exc:
# Give up trying to make sense of the value
raise TypeError(
f"Cannot handle {value!r} (type {type(value)!r}) as a value of a StreamBlock"
) from exc
# Test succeeded, so return as a StreamValue-ified version of that value
return self.meta.value_class(
self,
[
(k, self.child_blocks[k].normalize(v))
for k, v in value
if k in self.child_blocks
],
)
def bulk_to_python(self, values):
# 'values' is a list of streams, each stream being a list of dicts with 'type', 'value' and
# optionally 'id'.
# We will iterate over these streams, constructing:
# 1) a set of per-child-block lists ('child_inputs'), to be sent to each child block's
# bulk_to_python method in turn (giving us 'child_outputs')
# 2) a 'block map' of each stream, telling us the type and id of each block and the index we
# need to look up in the corresponding child_outputs list to obtain its final value
child_inputs = defaultdict(list)
block_maps = []
for stream in values:
block_map = []
for block_dict in stream:
block_type = block_dict["type"]
if block_type not in self.child_blocks:
# skip any blocks with an unrecognised type
continue
child_input_list = child_inputs[block_type]
child_index = len(child_input_list)
child_input_list.append(block_dict["value"])
block_map.append((block_type, block_dict.get("id"), child_index))
block_maps.append(block_map)
# run each list in child_inputs through the relevant block's bulk_to_python
# to obtain child_outputs
child_outputs = {
block_type: self.child_blocks[block_type].bulk_to_python(child_input_list)
for block_type, child_input_list in child_inputs.items()
}
# for each stream, go through the block map, picking out the appropriately-indexed
# value from the relevant list in child_outputs
return [
self.meta.value_class(
self,
[
(block_type, child_outputs[block_type][child_index], id)
for block_type, id, child_index in block_map
],
is_lazy=False,
)
for block_map in block_maps
]
def get_prep_value(self, value):
if not value:
# Falsy values (including None, empty string, empty list, and
# empty StreamValue) become an empty stream
return []
else:
# value is a StreamValue - delegate to its get_prep_value() method
# (which has special-case handling for lazy StreamValues to avoid useless
# round-trips to the full data representation and back)
return value.get_prep_value()
def normalize(self, value):
return self.to_python(value)
def get_form_state(self, value):
if not value:
return []
else:
return [
{
"type": child.block.name,
"value": child.block.get_form_state(child.value),
"id": child.id,
}
for child in value
]
def get_api_representation(self, value, context=None):
if value is None:
# treat None as identical to an empty stream
return []
return [
{
"type": child.block.name,
"value": child.block.get_api_representation(
child.value, context=context
),
"id": child.id,
}
for child in value # child is a StreamChild instance
]
def render_basic(self, value, context=None):
return format_html_join(
"\n",
'<div class="block-{1}">{0}</div>',
[(child.render(context=context), child.block_type) for child in value],
)
def get_searchable_content(self, value):
if not self.search_index:
return []
content = []
for child in value:
content.extend(child.block.get_searchable_content(child.value))
return content
def extract_references(self, value):
for child in value:
for (
model,
object_id,
model_path,
content_path,
) in child.block.extract_references(child.value):
model_path = (
f"{child.block_type}.{model_path}"
if model_path
else child.block_type
)
content_path = (
f"{child.id}.{content_path}" if content_path else child.id
)
yield model, object_id, model_path, content_path
def get_block_by_content_path(self, value, path_elements):
"""
Given a list of elements from a content path, retrieve the block at that path
as a BoundBlock object, or None if the path does not correspond to a valid block.
"""
if path_elements:
id, *remaining_elements = path_elements
for child in value:
if child.id == id:
return child.block.get_block_by_content_path(
child.value, remaining_elements
)
else:
# an empty path refers to the stream as a whole
return self.bind(value)
def deconstruct(self):
"""
Always deconstruct StreamBlock instances as if they were plain StreamBlocks with all of the
field definitions passed to the constructor - even if in reality this is a subclass of StreamBlock
with the fields defined declaratively, or some combination of the two.
This ensures that the field definitions get frozen into migrations, rather than leaving a reference
to a custom subclass in the user's models.py that may or may not stick around.
"""
path = "wagtail.blocks.StreamBlock"
args = [list(self.child_blocks.items())]
kwargs = self._constructor_kwargs
return (path, args, kwargs)
def check(self, **kwargs):
errors = super().check(**kwargs)
for name, child_block in self.child_blocks.items():
errors.extend(child_block.check(**kwargs))
errors.extend(child_block._check_name(**kwargs))
return errors
class Meta:
# No icon specified here, because that depends on the purpose that the
# block is being used for. Feel encouraged to specify an icon in your
# descendant block type
icon = "placeholder"
default = []
required = True
form_classname = None
min_num = None
max_num = None
block_counts = {}
collapsed = False
value_class = StreamValue
MUTABLE_META_ATTRIBUTES = [
"required",
"min_num",
"max_num",
"block_counts",
"collapsed",
"value_class",
]
class StreamBlock(BaseStreamBlock, metaclass=DeclarativeSubBlocksMetaclass):
pass
class StreamBlockAdapter(Adapter):
js_constructor = "wagtail.blocks.StreamBlock"

Wyświetl plik

@ -6,7 +6,7 @@ from django.db import models
from django.db.models.fields.json import KeyTransform
from django.utils.encoding import force_str
from wagtail.blocks import Block, BlockField, StreamBlock, StreamValue
from wagtail.blocks import Block, BlockField, StreamBlock
from wagtail.rich_text import (
RichTextMaxLengthValidator,
extract_references_from_rich_text,
@ -88,7 +88,7 @@ class StreamField(models.Field):
# extract kwargs that are to be passed on to the block, not handled by super
block_opts = {}
for arg in ["min_num", "max_num", "block_counts", "collapsed"]:
for arg in ["min_num", "max_num", "block_counts", "collapsed", "value_class"]:
if arg in kwargs:
block_opts[arg] = kwargs.pop(arg)
@ -111,6 +111,10 @@ class StreamField(models.Field):
self.stream_block.set_meta_options(block_opts)
@property
def value_class(self):
return self.stream_block.meta.value_class
@property
def json_field(self):
return models.JSONField(encoder=DjangoJSONEncoder)
@ -142,7 +146,7 @@ class StreamField(models.Field):
def get_prep_value(self, value):
if (
isinstance(value, StreamValue)
isinstance(value, self.value_class)
and not (value)
and value.raw_text is not None
):
@ -151,7 +155,7 @@ class StreamField(models.Field):
# for reverse migrations that convert StreamField data back into plain text
# fields.)
return value.raw_text
elif isinstance(value, StreamValue):
elif isinstance(value, self.value_class):
# StreamValue instances must be prepared first.
return json.dumps(
self.stream_block.get_prep_value(value), cls=DjangoJSONEncoder
@ -163,7 +167,7 @@ class StreamField(models.Field):
return self.json_field.get_prep_value(value)
def get_db_prep_value(self, value, connection, prepared=False):
if not isinstance(value, StreamValue):
if not isinstance(value, self.value_class):
# When querying with JSONField features, the rhs might not be a StreamValue.
# As of Django 4.2, JSONField value serialisation is handled in
# get_db_prep_value instead of get_prep_value.

Wyświetl plik

@ -0,0 +1,24 @@
# Generated by Django 4.2.11 on 2024-04-22 08:19
from django.db import migrations, models
import wagtail.blocks
import wagtail.fields
import wagtail.images.blocks
class Migration(migrations.Migration):
dependencies = [
('tests', '0036_complexdefaultstreampage'),
]
operations = [
migrations.CreateModel(
name='JSONCustomValueStreamModel',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('primary_content', wagtail.fields.StreamField([('text', wagtail.blocks.CharBlock()), ('rich_text', wagtail.blocks.RichTextBlock()), ('image', wagtail.images.blocks.ImageChooserBlock())])),
('secondary_content', wagtail.fields.StreamField([('text', wagtail.blocks.CharBlock()), ('rich_text', wagtail.blocks.RichTextBlock()), ('image', wagtail.images.blocks.ImageChooserBlock())])),
],
),
]

Wyświetl plik

@ -42,6 +42,7 @@ from wagtail.blocks import (
RawHTMLBlock,
RichTextBlock,
StreamBlock,
StreamValue,
StructBlock,
)
from wagtail.contrib.forms.forms import FormBuilder
@ -1552,6 +1553,40 @@ class JSONBlockCountsStreamModel(models.Model):
)
class CustomStreamValue(StreamValue):
"""
Used by ``CustomStreamBlock`` and ``JSONCustomValueStreamModel.primary_content`` (below)
to demonstrate support for custom value classes with ``StreamField`` and ``StreamBlock``.
"""
def level_of_customness(self) -> bool:
return "medium"
class CustomValueStreamBlock(StreamBlock):
text = CharBlock()
rich_text = RichTextBlock()
image = ImageChooserBlock()
class Meta:
value_class = CustomStreamValue
class JSONCustomValueStreamModel(models.Model):
# `value_class` can be provided as an init kwarg to StreamField
primary_content = StreamField(
[
("text", CharBlock()),
("rich_text", RichTextBlock()),
("image", ImageChooserBlock()),
],
value_class=CustomStreamValue,
)
# `value_class` can be customised by overriding in StreamBlock.Meta
secondary_content = StreamField(CustomValueStreamBlock())
class ExtendedImageChooserBlock(ImageChooserBlock):
"""
Example of Block with custom get_api_representation method.

Wyświetl plik

@ -20,7 +20,11 @@ from wagtail.blocks.base import get_error_json_data
from wagtail.blocks.field_block import FieldBlockAdapter
from wagtail.blocks.list_block import ListBlockAdapter, ListBlockValidationError
from wagtail.blocks.static_block import StaticBlockAdapter
from wagtail.blocks.stream_block import StreamBlockAdapter, StreamBlockValidationError
from wagtail.blocks.stream_block import (
StreamBlockAdapter,
StreamBlockValidationError,
StreamValue,
)
from wagtail.blocks.struct_block import StructBlockAdapter, StructBlockValidationError
from wagtail.models import Page
from wagtail.rich_text import RichText
@ -3174,6 +3178,7 @@ class TestStreamBlock(WagtailTestUtils, SimpleTestCase):
)
self.assertEqual(list(block.child_blocks.keys()), ["heading", "paragraph"])
self.assertIs(block.value_class, StreamValue)
def test_initialisation_with_binary_string_names(self):
# migrations will sometimes write out names as binary strings, just to keep us on our toes
@ -3186,6 +3191,20 @@ class TestStreamBlock(WagtailTestUtils, SimpleTestCase):
self.assertEqual(list(block.child_blocks.keys()), [b"heading", b"paragraph"])
def test_initialisation_with_custom_value_class(self):
class CustomStreamValue(StreamValue):
pass
block = blocks.StreamBlock(
[
("heading", blocks.CharBlock()),
("paragraph", blocks.CharBlock()),
],
value_class=CustomStreamValue,
)
self.assertIs(block.value_class, CustomStreamValue)
def test_initialisation_from_subclass(self):
class ArticleBlock(blocks.StreamBlock):
heading = blocks.CharBlock()

Wyświetl plik

@ -17,7 +17,9 @@ from wagtail.rich_text import RichText
from wagtail.signal_handlers import disable_reference_index_auto_update
from wagtail.test.testapp.models import (
ComplexDefaultStreamPage,
CustomStreamValue,
JSONBlockCountsStreamModel,
JSONCustomValueStreamModel,
JSONMinMaxCountStreamModel,
JSONStreamModel,
StreamPage,
@ -225,6 +227,38 @@ class TestStreamValueAccess(TestCase):
self.assertIsInstance(fetched_body[0].value, RichText)
self.assertEqual(fetched_body[0].value.source, "<h2>hello world</h2>")
def test_custom_value_class(self):
original_content = json.dumps([{"type": "text", "value": "foo"}])
obj = JSONCustomValueStreamModel.objects.create(
primary_content=original_content,
secondary_content=original_content,
)
# Both fields should return instances of CustomStreamValue
self.assertIsInstance(obj.primary_content, CustomStreamValue)
self.assertIsInstance(obj.secondary_content, CustomStreamValue)
# It should still be possible to update the fields using a raw dict value
new_content = [("rich_text", RichText("<h2>hello world</h2>"))]
obj.primary_content = new_content
obj.secondary_content = new_content
obj.save()
obj.refresh_from_db("primary_content", "secondary_content")
# CustomStreamValue is functionally equivalent to StreamValue, so the same value
# transformation should have taken place
for streamfield in ("primary_content", "secondary_content"):
with self.subTest(streamfield):
field_value = getattr(self, streamfield)
self.assertEqual(len(field_value), 1)
self.assertIsInstance(field_value[0].value, RichText)
self.assertEqual(field_value[0].value.source, "<h2>hello world</h2>")
# The value is still an instance of the custom value class
self.assertIsInstance(field_value, CustomStreamValue)
# So, we can do this...
self.assertEqual(field_value.level_of_customness(), "medium")
def test_can_append(self):
self.json_body.body.append(("text", "bar"))
self.json_body.save()