sforkowany z mtyton/comfy
Merge pull request 'added product loading feature' (#4) from feature/product_sheet_load into main
Reviewed-on: mtyton/comfy#4main
commit
7c8ed587e3
|
@ -228,3 +228,5 @@ LOGGING = {
|
|||
"level": "WARNING",
|
||||
},
|
||||
}
|
||||
|
||||
PRODUCTS_CSV_PATH = os.environ.get("PRODUCTS_CSV_PATH", "products.csv")
|
||||
|
|
|
@ -1,3 +0,0 @@
|
|||
from django.contrib import admin
|
||||
|
||||
# Register your models here.
|
|
@ -1,6 +0,0 @@
|
|||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class DocgeneratorConfig(AppConfig):
|
||||
default_auto_field = 'django.db.models.BigAutoField'
|
||||
name = 'docgenerator'
|
|
@ -1,44 +0,0 @@
|
|||
from abc import (
|
||||
ABC,
|
||||
abstractmethod
|
||||
)
|
||||
from typing import (
|
||||
Dict,
|
||||
Any
|
||||
)
|
||||
|
||||
from django.db.models import Model
|
||||
from docxtpl import DocxTemplate
|
||||
|
||||
|
||||
class DocumentGeneratorInterface(ABC):
|
||||
@abstractmethod
|
||||
def load_template(self, path: str):
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def get_extra_context(self) -> Dict[str, Any]:
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def generate_file(self, context: Dict[str, Any] = None):
|
||||
...
|
||||
|
||||
|
||||
class BaseDocumentGenerator(DocumentGeneratorInterface):
|
||||
|
||||
def __init__(self, instance: Model) -> None:
|
||||
super().__init__()
|
||||
self.instance = instance
|
||||
|
||||
def load_template(self, path: str):
|
||||
return DocxTemplate(path)
|
||||
|
||||
def get_extra_context(self):
|
||||
return {}
|
||||
|
||||
|
||||
class PdfFromDocGenerator(BaseDocumentGenerator):
|
||||
def generate_file(self, context: Dict[str, Any] = None):
|
||||
template = self.load_template()
|
||||
context.update(self.get_extra_context())
|
|
@ -1,10 +0,0 @@
|
|||
from django.db import models
|
||||
from django.core.files.storage import Storage
|
||||
|
||||
|
||||
class DocumentTemplate(models.Model):
|
||||
name = models.CharField(max_length=255)
|
||||
file = models.FileField(upload_to="doc_templates/", )
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.name
|
|
@ -1,5 +0,0 @@
|
|||
from django.test import TestCase
|
||||
|
||||
|
||||
class PdfFromDocGeneratorTestCase(TestCase):
|
||||
...
|
|
@ -1,3 +0,0 @@
|
|||
from django.shortcuts import render
|
||||
|
||||
# Create your views here.
|
|
@ -10,3 +10,4 @@ factory-boy==3.2.1
|
|||
pdfkit==1.0.0
|
||||
num2words==0.5.12
|
||||
sentry-sdk==1.28.0
|
||||
pandas==2.0.3
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
import logging
|
||||
import requests
|
||||
import pandas as pd
|
||||
|
||||
from django.core import files
|
||||
|
||||
from store.models import (
|
||||
ProductTemplate,
|
||||
ProductCategoryParamValue,
|
||||
Product,
|
||||
ProductImage
|
||||
)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BaseLoader:
|
||||
def __init__(self, path):
|
||||
self.path = path
|
||||
|
||||
def load_data(self):
|
||||
return pd.read_csv(self.path)
|
||||
|
||||
|
||||
class TemplateLoader(BaseLoader):
|
||||
...
|
||||
|
||||
|
||||
class ProductLoader(BaseLoader):
|
||||
|
||||
def _get_images(self, row) -> list[files.ContentFile]:
|
||||
urls = row["images"]
|
||||
images = []
|
||||
for url in urls:
|
||||
response = requests.get(url, stream=True)
|
||||
if response.status_code == 200:
|
||||
data = response.raw
|
||||
file_name = url.split("/")[-1]
|
||||
image = files.ContentFile(data, name=file_name)
|
||||
images.append(image)
|
||||
return images
|
||||
|
||||
def _process_row(self, row):
|
||||
template = ProductTemplate.objects.get(code=row["template"])
|
||||
price = float(row["price"])
|
||||
name = row["name"]
|
||||
available = bool(row["available"])
|
||||
params = []
|
||||
for param in row["params"]:
|
||||
key, value = param
|
||||
param = ProductCategoryParamValue.objects.get(param__key=key, value=value)
|
||||
params.append(param)
|
||||
product = Product.objects.get_or_create_by_params(template=template, params=params)
|
||||
product.price = price
|
||||
product.name = name
|
||||
product.available = available
|
||||
|
||||
images = self._get_images(row)
|
||||
for i, image in enumerate(images):
|
||||
ProductImage.objects.create(product=product, image=image, is_main=bool(i==0))
|
||||
product.save()
|
||||
return product
|
||||
|
||||
def process(self):
|
||||
data = self.load_data()
|
||||
products = []
|
||||
for _, row in data.iterrows():
|
||||
try:
|
||||
product = self._process_row(row)
|
||||
except Exception as e:
|
||||
# catch any error and log it, GlitchTip will catch it
|
||||
logger.exception(str(e))
|
||||
else:
|
||||
products.append(product)
|
||||
logger.info(f"Loaded {len(products)} products")
|
|
@ -0,0 +1,13 @@
|
|||
from django.core.management import BaseCommand
|
||||
from django.conf import settings
|
||||
|
||||
from store.loader import ProductLoader
|
||||
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Load products from csv file"
|
||||
|
||||
def handle(self, *args, **options):
|
||||
loader = ProductLoader(settings.PRODUCTS_CSV_PATH)
|
||||
loader.process()
|
|
@ -22,6 +22,7 @@ from django.template import (
|
|||
)
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.db.models.signals import m2m_changed
|
||||
from django.forms import CheckboxSelectMultiple
|
||||
|
||||
from modelcluster.models import ClusterableModel
|
||||
from modelcluster.fields import ParentalKey
|
||||
|
@ -141,6 +142,7 @@ class ProductTemplate(ClusterableModel):
|
|||
title = models.CharField(max_length=255)
|
||||
code = models.CharField(max_length=255)
|
||||
description = models.TextField(blank=True)
|
||||
# TODO - add mechanism for enabling params
|
||||
|
||||
tags = TaggableManager()
|
||||
|
||||
|
@ -206,7 +208,8 @@ class Product(ClusterableModel):
|
|||
name = models.CharField(max_length=255, blank=True)
|
||||
template = models.ForeignKey(ProductTemplate, on_delete=models.CASCADE, related_name="products")
|
||||
params = models.ManyToManyField(
|
||||
ProductCategoryParamValue, blank=True, through="ProductParam"
|
||||
ProductCategoryParamValue, blank=True, through="ProductParam",
|
||||
limit_choices_to=models.Q(param__category=models.F("product__template__category"))
|
||||
)
|
||||
price = models.FloatField()
|
||||
available = models.BooleanField(default=True)
|
||||
|
@ -217,7 +220,7 @@ class Product(ClusterableModel):
|
|||
panels = [
|
||||
FieldPanel("template"),
|
||||
FieldPanel("price"),
|
||||
FieldPanel("params"),
|
||||
FieldPanel("params", widget=CheckboxSelectMultiple),
|
||||
FieldPanel("available"),
|
||||
FieldPanel("name"),
|
||||
InlinePanel("product_images", label="Variant Images"),
|
||||
|
@ -295,6 +298,8 @@ class ProductListPage(Page):
|
|||
tags = TaggableManager(blank=True)
|
||||
|
||||
def _get_items(self):
|
||||
if not self.pk:
|
||||
return ProductTemplate.objects.all()
|
||||
if self.tags.all():
|
||||
return ProductTemplate.objects.filter(tags__in=self.tags.all())
|
||||
return ProductTemplate.objects.all()
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
import pandas as pd
|
||||
from django.test import TestCase
|
||||
from unittest.mock import patch
|
||||
|
||||
from store.tests import factories
|
||||
from store.loader import ProductLoader
|
||||
|
||||
|
||||
class TestProductLoader(TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.category = factories.ProductCategoryFactory()
|
||||
self.template = factories.ProductTemplateFactory(category=self.category)
|
||||
self.category_params = [factories.ProductCategoryParamFactory(category=self.category) for _ in range(3)]
|
||||
self.category_param_values = [factories.ProductCategoryParamValueFactory(param=param) for param in self.category_params]
|
||||
|
||||
def test_load_products_single_product_success(self):
|
||||
fake_df = pd.DataFrame({
|
||||
"template": [self.template.code],
|
||||
"price": [10.0],
|
||||
"name": ["Test product"],
|
||||
"available": [True],
|
||||
"params": [[
|
||||
(self.category_params[0].key, self.category_param_values[0].value),
|
||||
(self.category_params[1].key, self.category_param_values[1].value),
|
||||
(self.category_params[2].key, self.category_param_values[2].value),
|
||||
]]
|
||||
})
|
||||
with patch("store.loader.BaseLoader.load_data", return_value=fake_df):
|
||||
loader = ProductLoader("fake_path")
|
||||
loader.process()
|
||||
|
||||
self.assertEqual(self.template.products.count(), 1)
|
||||
product = self.template.products.first()
|
||||
self.assertEqual(product.price, 10.0)
|
||||
self.assertEqual(product.name, "Test product")
|
||||
self.assertEqual(product.available, True)
|
||||
|
||||
@patch("store.loader.logger")
|
||||
def test_load_incorrect_data_types_failure(self, mock_logger):
|
||||
fake_df = pd.DataFrame({
|
||||
"template": [self.template.code],
|
||||
"price": ["FASDSADQAW"],
|
||||
"name": ["Test product"],
|
||||
"available": [True],
|
||||
"params": [[
|
||||
(self.category_params[0].key, self.category_param_values[0].value),
|
||||
(self.category_params[1].key, self.category_param_values[1].value),
|
||||
(self.category_params[2].key, self.category_param_values[2].value),
|
||||
]]
|
||||
})
|
||||
with patch("store.loader.BaseLoader.load_data", return_value=fake_df):
|
||||
loader = ProductLoader("fake_path")
|
||||
loader.process()
|
||||
|
||||
self.assertEqual(self.template.products.count(), 0)
|
||||
mock_logger.exception.assert_called_with("could not convert string to float: 'FASDSADQAW'")
|
||||
|
||||
@patch("store.loader.logger")
|
||||
def test_load_no_existing_template_code_failure(self, mock_logger):
|
||||
fake_df = pd.DataFrame({
|
||||
"template": ["NOTEEXISTINGTEMPLATE"],
|
||||
"price": [10.0],
|
||||
"name": ["Test product"],
|
||||
"available": [True],
|
||||
"params": [[
|
||||
(self.category_params[0].key, self.category_param_values[0].value),
|
||||
(self.category_params[1].key, self.category_param_values[1].value),
|
||||
(self.category_params[2].key, self.category_param_values[2].value),
|
||||
]]
|
||||
})
|
||||
with patch("store.loader.BaseLoader.load_data", return_value=fake_df):
|
||||
loader = ProductLoader("fake_path")
|
||||
loader.process()
|
||||
|
||||
self.assertEqual(self.template.products.count(), 0)
|
||||
mock_logger.exception.assert_called_with("ProductTemplate matching query does not exist.")
|
||||
|
||||
@patch("store.loader.logger")
|
||||
def test_not_existing_params_key_value_pairs_failure(self, mock_logger):
|
||||
fake_df = pd.DataFrame({
|
||||
"template": [self.template.code],
|
||||
"price": [10.0],
|
||||
"name": ["Test product"],
|
||||
"available": [True],
|
||||
"params": [[
|
||||
(self.category_params[0].key, self.category_param_values[2].value),
|
||||
(self.category_params[1].key, self.category_param_values[0].value),
|
||||
(self.category_params[2].key, self.category_param_values[1].value),
|
||||
]]
|
||||
})
|
||||
with patch("store.loader.BaseLoader.load_data", return_value=fake_df):
|
||||
loader = ProductLoader("fake_path")
|
||||
loader.process()
|
||||
|
||||
self.assertEqual(self.template.products.count(), 0)
|
||||
mock_logger.exception.assert_called_with("ProductCategoryParamValue matching query does not exist.")
|
||||
|
Ładowanie…
Reference in New Issue