This commit is contained in:
Parsa Nazer
2026-05-18 14:26:00 +03:30
parent ca478d71e1
commit 337e0723a8
15 changed files with 940 additions and 4 deletions
@@ -0,0 +1,16 @@
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('account', '0035_shopmodel_customer_pickup_description_and_more'),
]
operations = [
migrations.AddField(
model_name='shopmodel',
name='torob_order_tracking_enabled',
field=models.BooleanField(default=False, verbose_name='فعال سازی پیگیری سفارش Torob'),
),
]
+1
View File
@@ -147,6 +147,7 @@ class ShopModel(models.Model):
shop_description = models.TextField(verbose_name='توضیحات فروشگاه')
commission_percent = models.DecimalField(max_digits=5, decimal_places=2, verbose_name='درصد کمیسیون')
telegram_chat_id = models.CharField(max_length=100, blank=True, null=True, verbose_name='شناسه چت تلگرام', help_text='برای ارسال خودکار فاکتورها به تلگرام')
torob_order_tracking_enabled = models.BooleanField(default=False, verbose_name='فعال سازی پیگیری سفارش Torob')
customer_pickup_title = models.CharField(max_length=40, verbose_name='عنوان دریافت حضوری', blank=True, null=True)
customer_pickup_description = models.CharField(max_length=500, verbose_name='توضیحات دریافت حضوری', blank=True, null=True)
+5 -1
View File
@@ -20,6 +20,10 @@ OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
TELEGRAM_BOT_TOKEN = '7068288679:AAGecMnyt9A6R78OQu8nQeISMK1LepX718g'
VAPID_PRIVATE_KEY = os.getenv("VAPID_PRIVATE_KEY")
TOROB_ORDER_TRACKING_ENABLED = os.getenv("TOROB_ORDER_TRACKING_ENABLED", "false").lower() == "true"
TOROB_PRODUCT_WEBHOOK_URL = os.getenv("TOROB_PRODUCT_WEBHOOK_URL", "https://api.torob.com/update/webhook/v1/")
TOROB_PRODUCT_WEBHOOK_TOKEN = os.getenv("TOROB_PRODUCT_WEBHOOK_TOKEN")
# Email Configuration
EMAIL_BACKEND = os.getenv("EMAIL_BACKEND")
EMAIL_HOST = os.getenv("EMAIL_HOST")
@@ -70,7 +74,7 @@ INSTALLED_APPS = [
'django_celery_beat',
'azbankgateways',
# Custom Apps
"product",
"product.apps.ProductConfig",
"account",
"ticket",
"chat",
+4
View File
@@ -5,10 +5,12 @@ from drf_spectacular.views import SpectacularSwaggerView, SpectacularAPIView
from django.conf import settings
from rest_framework_simplejwt.views import TokenObtainPairView
from product import views
from product.torob_api import TorobProductSyncView
from account.views import CustomTokenObtainPairView, TokenRefreshView
from home.views import HomeView
from .views import FakeAdminLoginView
from azbankgateways.urls import az_bank_gateways_urls
from order.torob_api import TorobOrderTrackingView
admin.autodiscover()
@@ -24,6 +26,8 @@ urlpatterns = [
path('admin/', FakeAdminLoginView.as_view()), # Fake admin
path('secret-admin/', admin.site.urls), # Real admin
path('schema/', SpectacularAPIView.as_view(), name='schema'),
path('torob_api/v3/products', TorobProductSyncView.as_view(), name='torob-product-sync'),
path('torob/v1/orders', TorobOrderTrackingView.as_view(), name='torob-order-tracking-root'),
path('products/', include('product.urls')),
path('accounts/', include('account.urls')),
path('chat/', include('chat.urls')),
@@ -0,0 +1,21 @@
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('order', '0045_alter_ordermodel_created_at'),
]
operations = [
migrations.AddField(
model_name='ordermodel',
name='updated_at',
field=models.DateTimeField(auto_now=True, verbose_name='تاریخ آخرین بروزرسانی'),
),
migrations.AddField(
model_name='ordermodel',
name='torob_clid',
field=models.CharField(blank=True, db_index=True, max_length=128, null=True, verbose_name='شناسه Torob'),
),
]
+2
View File
@@ -173,7 +173,9 @@ class OrderModel(models.Model):
related_name='orders', null=True, verbose_name='ادرس')
created_at = models.DateTimeField(
auto_now_add=True, verbose_name="تاریخ ثبت سفارش")
updated_at = models.DateTimeField(auto_now=True, verbose_name="تاریخ آخرین بروزرسانی")
is_paid = models.BooleanField(default=False, verbose_name="وضعیت پرداخت")
torob_clid = models.CharField(max_length=128, blank=True, null=True, db_index=True, verbose_name='شناسه Torob')
discount_code = models.ForeignKey(
DiscountCode, on_delete=models.PROTECT, null=True, blank=True, verbose_name="کدتخفیف")
special_discount_code = models.ForeignKey(
+179
View File
@@ -0,0 +1,179 @@
from __future__ import annotations
from datetime import datetime, timezone as dt_timezone
import jwt
from django.conf import settings
from django.db.models import Q
from django.db.models import Prefetch
from django.utils.dateparse import parse_datetime
from rest_framework import serializers, status
from rest_framework.response import Response
from rest_framework.views import APIView
from account.models import ShopModel
from .models import OrderModel, OrderItemModel
TOROB_PUBLIC_KEY = """
-----BEGIN PUBLIC KEY-----
MCowBQYDK2VwAyEAt6Mu4T0pBORY11W+QeM35UsmLO3vsf+6yKpFDEImFk0=
-----END PUBLIC KEY-----
"""
class TorobTokenError(Exception):
pass
class TorobOrderQuerySerializer(serializers.Serializer):
purchase_timestamp_gt = serializers.CharField(required=True)
limit = serializers.IntegerField(required=True, min_value=1, max_value=1000)
def validate_purchase_timestamp_gt(self, value):
parsed = parse_datetime(value.replace("Z", "+00:00"))
if parsed is None:
raise serializers.ValidationError("purchase_timestamp_gt must be a valid ISO 8601 timestamp.")
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=dt_timezone.utc)
return parsed.astimezone(dt_timezone.utc)
def _utc_iso(value: datetime) -> str:
return value.astimezone(dt_timezone.utc).isoformat().replace("+00:00", "Z")
def _shop_product_url(request, product) -> str:
domain = getattr(settings, "DOMAIN", None) or getattr(settings, "API_DOMAIN", None) or request.get_host()
if domain.startswith("http://") or domain.startswith("https://"):
base = domain.rstrip("/")
else:
base = f"https://{domain}".rstrip("/")
return f"{base}/product/{product.slug}/"
def _get_hostname_from_request(request) -> str:
"""Extract hostname for JWT audience validation.
Returns the full host including port if present, as JWT audience
should match exactly what Torob expects in the token.
"""
return request.get_host()
def _validate_torob_token(request) -> None:
token = request.headers.get("X-Torob-Token")
version = request.headers.get("X-Torob-Token-Version")
if version != "1":
raise TorobTokenError("invalid token version")
if not token:
raise TorobTokenError("missing token")
jwt.decode(
token,
key=TOROB_PUBLIC_KEY,
algorithms=["EdDSA"],
audience=_get_hostname_from_request(request),
)
def _resolve_order_status(order: OrderModel) -> str | None:
if order.status in {"RECEIVED", "POSTED"}:
return "completed"
if order.status == "CANCELED":
return "cancelled"
return None
def _order_shop_enabled(order: OrderModel) -> bool:
shop_ids = set()
for item in order.items.select_related("product__product__shop").all():
shop = getattr(item.product.product, "shop", None)
if shop is not None:
shop_ids.add(shop.id)
if not shop_ids:
return False
return ShopModel.objects.filter(id__in=shop_ids, torob_order_tracking_enabled=True).count() == len(shop_ids)
def _serialize_order(order: OrderModel, request) -> dict:
products = []
for item in order.items.select_related("product__product", "product__product__category").all():
product = item.product.product
products.append(
{
"product_url": _shop_product_url(request, product),
"product_price": int(item.price_after_special_discount() // item.quantity) if item.quantity else int(item.price),
"quantity": int(item.quantity),
}
)
payload = {
"order_id": str(order.id),
"purchase_timestamp": _utc_iso(order.created_at),
"last_updated_timestamp": _utc_iso(order.updated_at or order.created_at),
"torob_clid": order.torob_clid,
"status": _resolve_order_status(order),
"order_value": int(order.final_price or 0),
"phone_number": order.user.phone if order.user else None,
"products": products,
}
return {key: value for key, value in payload.items() if value is not None}
class TorobOrderTrackingView(APIView):
authentication_classes = []
permission_classes = []
def get(self, request):
if not settings.TOROB_ORDER_TRACKING_ENABLED:
return Response({"error": "order tracking is disabled"}, status=status.HTTP_403_FORBIDDEN)
try:
_validate_torob_token(request)
except TorobTokenError as exc:
return Response({"error": str(exc)}, status=status.HTTP_401_UNAUTHORIZED)
except jwt.PyJWTError:
return Response({"error": "invalid token"}, status=status.HTTP_401_UNAUTHORIZED)
serializer = TorobOrderQuerySerializer(data=request.query_params)
serializer.is_valid(raise_exception=True)
since = serializer.validated_data["purchase_timestamp_gt"]
limit = serializer.validated_data["limit"]
orders = (
OrderModel.objects.select_related("user")
.prefetch_related(
Prefetch(
"items",
queryset=OrderItemModel.objects.select_related("product__product", "product__product__shop"),
)
)
.filter(Q(created_at__gt=since) | Q(updated_at__gt=since))
.order_by("created_at", "id")
)
serialized = []
disabled_match_found = False
for order in orders:
status_value = _resolve_order_status(order)
if not status_value:
continue
if not order.torob_clid:
continue
if not _order_shop_enabled(order):
disabled_match_found = True
continue
serialized.append(_serialize_order(order, request))
if len(serialized) >= limit:
break
if not serialized and disabled_match_found:
return Response({"error": "order tracking access is disabled for this shop"}, status=status.HTTP_403_FORBIDDEN)
return Response({"success": True, "data": serialized}, status=status.HTTP_200_OK)
+2
View File
@@ -2,8 +2,10 @@ from django.conf.urls.static import static
from django.contrib import admin
from django.urls import path, include
from .views import *
from .torob_api import TorobOrderTrackingView
urlpatterns = [
path('torob/v1/orders', TorobOrderTrackingView.as_view(), name='torob-order-tracking'),
path('all', OrderlistView.as_view(), name='order-list'),
path('cart', CartView.as_view()),
path('cart/set-address', SetAddressForCartView.as_view()),
+8
View File
@@ -251,6 +251,13 @@ class PaymentView(APIView):
permission_classes = [IsAuthenticated]
serializer_class = BankTypeSerializer
def _get_torob_clid(self, request):
return (
request.data.get('torob_clid')
or request.COOKIES.get('torob_clid')
or request.headers.get('X-Torob-Clid')
)
@extend_schema(
description="choices=['BMI', 'SEP', 'ZARINPAL', 'IDPAY', 'ZIBAL', 'BAHAMTA', 'MELLAT', 'PAYV1']",
tags=['order payment']
@@ -338,6 +345,7 @@ class PaymentView(APIView):
user=request.user,
address=cart.address,
created_at=timezone.now().date(),
torob_clid=self._get_torob_clid(request),
discount_code=cart.discount_code,
special_discount_code=cart.special_discount_code,
discount_amount=cart.discount_code_amount,
+3
View File
@@ -5,3 +5,6 @@ class ProductConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'product'
verbose_name = 'محصول'
def ready(self):
from . import signals # noqa: F401
@@ -0,0 +1,31 @@
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("product", "0073_productrating"),
]
operations = [
migrations.AddField(
model_name="productmodel",
name="updated_at",
field=models.DateTimeField(
auto_now=True,
blank=True,
null=True,
verbose_name="زمان آخرین بروزرسانی محصول",
),
),
migrations.AddField(
model_name="productvariant",
name="updated_at",
field=models.DateTimeField(
auto_now=True,
blank=True,
null=True,
verbose_name="زمان آخرین بروزرسانی تنوع",
),
),
]
+4
View File
@@ -199,6 +199,8 @@ class ProductModel(models.Model):
default=5, help_text='امتیاز محصول', verbose_name='متا ریتینگ')
created_at = models.DateTimeField(
auto_now_add=True, verbose_name='زمان ثبت محصول')
updated_at = models.DateTimeField(
auto_now=True, null=True, blank=True, verbose_name='زمان آخرین بروزرسانی محصول')
category = models.ForeignKey(SubCategoryModel, null=True, on_delete=models.SET_NULL,
related_name='products', verbose_name='دسته بندی محصول')
related_products = models.ManyToManyField(
@@ -410,6 +412,8 @@ class ProductVariant(DirtyFieldsMixin, models.Model):
ShowCaseSlider, verbose_name='دسته بندی پورسانتی', blank=True, null=True, on_delete=models.CASCADE)
created_at = models.DateTimeField(
auto_now_add=True, verbose_name='زمان ثبت محصول')
updated_at = models.DateTimeField(
auto_now=True, null=True, blank=True, verbose_name='زمان آخرین بروزرسانی تنوع')
def __str__(self):
return f"{self.product.name} - {', '.join(str(attr) for attr in self.product_attributes.all())}"
+21
View File
@@ -0,0 +1,21 @@
from __future__ import annotations
from django.conf import settings
from django.db.models.signals import post_save
from django.db import transaction
from django.dispatch import receiver
from .models import ProductModel, ProductVariant
from .tasks import send_torob_product_webhook
@receiver(post_save, sender=ProductModel)
def notify_torob_product_save(sender, instance, **kwargs):
if settings.TOROB_PRODUCT_WEBHOOK_TOKEN:
transaction.on_commit(lambda: send_torob_product_webhook.delay([instance.id]))
@receiver(post_save, sender=ProductVariant)
def notify_torob_variant_save(sender, instance, **kwargs):
if settings.TOROB_PRODUCT_WEBHOOK_TOKEN:
transaction.on_commit(lambda: send_torob_product_webhook.delay([instance.product_id]))
+156 -2
View File
@@ -1,6 +1,82 @@
from celery import shared_task
from order.models import OrderItemModel, OrderModel
from product.models import DollorModel, ProductVariant
from django.conf import settings
from django.db.models import Prefetch
import requests
import logging
import time
from urllib.parse import urlparse
from product.models import DollorModel, ProductVariant, ProductModel
logger = logging.getLogger(__name__)
TOROB_WEBHOOK_BATCH_SIZE = 100
TOROB_WEBHOOK_MIN_INTERVAL_SECONDS = 3.1
TOROB_WEBHOOK_MAX_RETRIES = 3
def _shop_product_url(product: ProductModel) -> str:
domain = getattr(settings, "DOMAIN", None) or getattr(settings, "API_DOMAIN", None) or ""
if domain.startswith("http://") or domain.startswith("https://"):
base = domain.rstrip("/")
else:
base = f"https://{domain}".rstrip("/") if domain else ""
return f"{base}/product/{product.slug}/"
def _variant_page_unique(product: ProductModel, variant: ProductVariant) -> str:
return f"{product.pk}_{variant.pk}"
def _chunks(items, size):
for idx in range(0, len(items), size):
yield items[idx: idx + size]
def _post_webhook_batch(url: str, headers: dict, batch_items: list[dict]) -> bool:
payload = {"items": batch_items}
backoff_seconds = 1.0
for attempt in range(1, TOROB_WEBHOOK_MAX_RETRIES + 1):
try:
response = requests.post(url, json=payload, headers=headers, timeout=10)
if response.status_code < 400:
return True
if response.status_code in (429, 500, 502, 503, 504):
logger.warning(
"Torob webhook transient failure (%s), attempt %s/%s",
response.status_code,
attempt,
TOROB_WEBHOOK_MAX_RETRIES,
)
if attempt < TOROB_WEBHOOK_MAX_RETRIES:
time.sleep(backoff_seconds)
backoff_seconds *= 2
continue
logger.error(
"Torob webhook failed with status %s: %s",
response.status_code,
response.text,
)
return False
except requests.RequestException as exc:
logger.warning(
"Torob webhook request exception on attempt %s/%s: %s",
attempt,
TOROB_WEBHOOK_MAX_RETRIES,
exc,
)
if attempt < TOROB_WEBHOOK_MAX_RETRIES:
time.sleep(backoff_seconds)
backoff_seconds *= 2
continue
return False
return False
@shared_task
def update_prices():
@@ -14,3 +90,81 @@ def update_prices():
for product in products:
product.set_or_update_price(dollor_price=dollor_price)
ProductVariant.objects.bulk_update(products, ['price'], batch_size=1000)
@shared_task
def send_torob_product_webhook(product_ids):
if not settings.TOROB_PRODUCT_WEBHOOK_TOKEN:
return {"sent": 0, "reason": "missing token"}
products = (
ProductModel.objects.filter(id__in=product_ids)
.select_related("category", "shop")
.prefetch_related(
Prefetch(
"variants",
queryset=ProductVariant.objects.prefetch_related(
"images",
"product_attributes__attribute_type"
).order_by("-discount", "price", "-created_at"),
)
)
)
url = settings.TOROB_PRODUCT_WEBHOOK_URL
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {settings.TOROB_PRODUCT_WEBHOOK_TOKEN}",
}
items = []
hosts = set()
for product in products:
if not product.slug:
continue
page_url = _shop_product_url(product)
parsed = urlparse(page_url)
if not (parsed.scheme in {"http", "https"} and parsed.netloc):
logger.warning("Skipping product %s due to invalid page_url: %s", product.pk, page_url)
continue
hosts.add(parsed.netloc.lower())
variants = list(product.variants.all())
if not variants:
continue
for variant in variants:
# Validate variant has images before sending to Torob
# Per spec: image_links is required, so skip variants without images
images = list(variant.images.all())
has_product_image = bool(product.image)
has_variant_images = bool(images)
if not (has_product_image or has_variant_images):
logger.debug(f"Skipping variant {variant.pk} for product {product.pk} - no images available")
continue
items.append(
{
"page_url": page_url,
"page_unique": _variant_page_unique(product, variant),
}
)
if not items:
return {"sent": 0, "reason": "no items"}
if len(hosts) > 1:
logger.error("Torob webhook items contain mixed domains: %s", sorted(hosts))
return {"sent": 0, "reason": "mixed domains"}
sent = 0
for batch in _chunks(items, TOROB_WEBHOOK_BATCH_SIZE):
ok = _post_webhook_batch(url=url, headers=headers, batch_items=batch)
if ok:
sent += len(batch)
time.sleep(TOROB_WEBHOOK_MIN_INTERVAL_SECONDS)
return {"sent": sent, "total": len(items)}
+486
View File
@@ -0,0 +1,486 @@
from __future__ import annotations
import logging
import math
from urllib.parse import urlparse
import jwt
from django.conf import settings
from django.core.paginator import EmptyPage, Paginator
from django.db.models import F, Prefetch
from django.db.models.functions import Coalesce
from rest_framework import serializers, status
from rest_framework.response import Response
from rest_framework.views import APIView
from .models import ProductModel, ProductVariant
logger = logging.getLogger(__name__)
TOROB_API_VERSION = "torob_api_v3"
TOROB_PAGE_SIZE = 100
TOROB_PUBLIC_KEY = """
-----BEGIN PUBLIC KEY-----
MCowBQYDK2VwAyEAt6Mu4T0pBORY11W+QeM35UsmLO3vsf+6yKpFDEImFk0=
-----END PUBLIC KEY-----
"""
class TorobTokenError(Exception):
pass
class TorobProductsRequestSerializer(serializers.Serializer):
page_urls = serializers.ListField(
child=serializers.URLField(),
min_length=1,
required=False,
)
page_uniques = serializers.ListField(
child=serializers.CharField(max_length=200),
min_length=1,
required=False,
)
page = serializers.IntegerField(min_value=1, required=False)
sort = serializers.ChoiceField(
choices=("date_added_desc", "date_updated_desc"),
required=False,
)
def validate(self, attrs):
modes = [name for name in ("page_urls", "page_uniques", "page") if name in attrs]
if len(modes) != 1:
raise serializers.ValidationError(
"invalid request body"
)
if "page" in attrs and "sort" not in attrs:
raise serializers.ValidationError({"sort": "sort parameter is not provided"})
if "page" not in attrs and "sort" in attrs:
raise serializers.ValidationError({"sort": "sort parameter is invalid"})
return attrs
def _normalize_url(value: str) -> str:
parsed = urlparse(value)
path = parsed.path.rstrip("/")
return f"{parsed.scheme.lower()}://{parsed.netloc.lower()}{path}"
def _extract_slug_from_url(value: str) -> str | None:
path = urlparse(value).path.strip("/")
if not path:
return None
return path.split("/")[-1]
def _variant_page_unique(product: ProductModel, variant: ProductVariant) -> str:
return f"{product.pk}_{variant.pk}"
def _parse_page_unique(value: str) -> tuple[str | None, str | None]:
text = str(value).strip()
if not text:
return None, None
if "_" in text:
product_id, variant_id = text.split("_", 1)
return product_id.strip() or None, variant_id.strip() or None
return text, None
def _get_hostname_from_request(request) -> str:
"""Extract hostname for JWT audience validation.
Returns the full host including port if present, as JWT audience
should match exactly what Torob expects in the token.
"""
return request.get_host()
def _absolute_url(request, value: str) -> str:
if value.startswith("http://") or value.startswith("https://"):
return value
return request.build_absolute_uri(value)
def _shop_product_url(request, product: ProductModel) -> str:
domain = getattr(settings, "DOMAIN", None) or getattr(settings, "API_DOMAIN", None) or request.get_host()
if domain.startswith("http://") or domain.startswith("https://"):
base = domain.rstrip("/")
else:
base = f"https://{domain}".rstrip("/")
url = f"{base}/product/{product.slug}/"
# Per spec: page_url max 1500 chars
return url[:1500]
def _truncate_text(value: str | None, max_length: int) -> str | None:
if not value:
return None
return str(value)[:max_length]
def _variant_spec(variant: ProductVariant | None) -> dict:
if not variant:
return {}
spec: dict = {}
for attribute in variant.product_attributes.all():
attribute_type = getattr(attribute, "attribute_type", None)
key = getattr(attribute_type, "name", None) or "attribute"
if attribute.value is not None:
spec[key] = attribute.value
if variant.color:
spec.setdefault("color", variant.color)
if variant.in_stock is not None:
spec.setdefault("in_stock", variant.in_stock)
return spec
def _product_image_links(request, product: ProductModel, variant: ProductVariant) -> list[str]:
links: list[str] = []
max_image_url_length = 1000 # Per spec: image_links items max 1000 chars
def add_image_url(image_url: str | None) -> None:
if not image_url:
return
absolute = _absolute_url(request, image_url)
# Truncate to max 1000 chars per spec and avoid duplicates
if len(absolute) <= max_image_url_length and absolute not in links:
links.append(absolute)
if product.image:
add_image_url(product.image.url)
for image in variant.images.all():
if getattr(image, "image", None):
add_image_url(image.image.url)
return links
def _variant_date_added(product: ProductModel, variant: ProductVariant) -> str:
if variant.created_at:
return variant.created_at.isoformat()
return product.created_at.isoformat()
def _variant_date_updated(product: ProductModel, variant: ProductVariant) -> str:
if getattr(variant, "updated_at", None):
return variant.updated_at.isoformat()
if getattr(product, "updated_at", None):
return product.updated_at.isoformat()
return _variant_date_added(product, variant)
def _variant_sort_key(variant: ProductVariant) -> tuple:
return (-variant.discount, variant.price or 0, -(variant.created_at.timestamp() if variant.created_at else 0))
def _serialize_variant(request, product: ProductModel, variant: ProductVariant) -> dict:
# Robust price and availability check
has_valid_price = variant.price is not None and variant.price > 0
# Availability is True if:
# 1. Has valid price AND
# 2. Either stock is not tracked (None) OR stock > 0
if has_valid_price:
if variant.in_stock is None:
# Stock not tracked - assume available if has price
availability = True
else:
# Stock is tracked - check if > 0
availability = variant.in_stock > 0
else:
availability = False
if availability:
old_price = int(variant.price) if variant.discount else None
price_after_discount = variant.price_after_discount
current_price = int(round(price_after_discount)) if price_after_discount else int(variant.price)
else:
old_price = None
current_price = 0
payload = {
"page_unique": _variant_page_unique(product, variant),
"page_url": _shop_product_url(request, product),
"product_group_id": str(product.pk),
"title": _truncate_text(product.name, 500),
"subtitle": _truncate_text(product.meta_description, 500),
"current_price": current_price,
"availability": availability,
"category_name": _truncate_text(product.category.name if product.category else None, 200),
"image_links": _product_image_links(request, product, variant),
"spec": _variant_spec(variant),
"guarantee": None,
"short_desc": _truncate_text(product.description, 500),
"date_added": _variant_date_added(product, variant),
"date_updated": _variant_date_updated(product, variant),
"seller_name": product.shop.shop_name if product.shop else None,
"seller_city": _truncate_text(product.shop.city if product.shop else None, 200),
}
if old_price is not None and old_price > current_price:
payload["old_price"] = old_price
return payload
def _validate_torob_token(request) -> None:
token = request.headers.get("X-Torob-Token")
version = request.headers.get("X-Torob-Token-Version")
if version != "1":
logger.warning(f"Invalid token version: {version}")
raise TorobTokenError("invalid token version")
if not token:
logger.warning("Missing X-Torob-Token header")
raise TorobTokenError("missing token")
try:
jwt.decode(
token,
key=TOROB_PUBLIC_KEY,
algorithms=["EdDSA"],
audience=_get_hostname_from_request(request),
)
logger.debug("Token validated successfully")
except jwt.ExpiredSignatureError:
logger.warning("Token has expired")
raise
except jwt.InvalidAudienceError:
logger.warning(f"Audience mismatch for request from {request.get_host()}")
raise
def _extract_error_message(errors: dict | list | str) -> str:
"""Extract the first error message from serializer errors."""
if isinstance(errors, dict):
# Get first error from dict
first_key = next(iter(errors.keys()), None)
if first_key:
first_value = errors[first_key]
if isinstance(first_value, list) and first_value:
return str(first_value[0])
return str(first_value)
return "Invalid request"
elif isinstance(errors, list) and errors:
return str(errors[0])
return str(errors) if errors else "Invalid request"
class TorobProductSyncView(APIView):
authentication_classes = []
permission_classes = []
def post(self, request):
# Validate Content-Type header
content_type = request.META.get('CONTENT_TYPE', '').split(';')[0].strip()
if content_type != 'application/json':
logger.warning(f"Invalid Content-Type: {content_type}")
return Response(
{"error": "Content-Type must be application/json"},
status=status.HTTP_400_BAD_REQUEST
)
try:
_validate_torob_token(request)
except TorobTokenError as exc:
return Response({"error": str(exc)}, status=status.HTTP_401_UNAUTHORIZED)
except jwt.PyJWTError as exc:
logger.warning(f"JWT validation failed: {exc}")
return Response({"error": "invalid token"}, status=status.HTTP_401_UNAUTHORIZED)
serializer = TorobProductsRequestSerializer(data=request.data)
if not serializer.is_valid():
error_message = _extract_error_message(serializer.errors)
logger.warning(f"Request validation failed: {error_message}")
return Response({"error": error_message}, status=status.HTTP_400_BAD_REQUEST)
data = serializer.validated_data
products_qs = ProductModel.objects.select_related(
"category",
"category__parent",
"shop",
).filter(slug__isnull=False).exclude(slug="")
base_qs = products_qs.prefetch_related(
Prefetch(
"variants",
queryset=ProductVariant.objects.select_related("product").prefetch_related(
"product_attributes__attribute_type",
"images",
).order_by("-created_at", "-pk"),
)
)
if "page_urls" in data:
requested_urls = data["page_urls"]
requested_slugs = [
slug for slug in (_extract_slug_from_url(url) for url in requested_urls)
if slug
]
products = list(base_qs.filter(slug__in=requested_slugs))
product_by_slug = {product.slug: product for product in products}
product_by_url = {
_normalize_url(_shop_product_url(request, product)): product
for product in products
}
ordered_products = []
for url in requested_urls:
slug = _extract_slug_from_url(url)
normalized_url = _normalize_url(url)
product = product_by_slug.get(slug) if slug else None
if product is None:
product = product_by_url.get(normalized_url)
if product is not None and product not in ordered_products:
ordered_products.append(product)
serialized_products = []
for product in ordered_products:
variants = list(product.variants.all())
if not variants:
continue
variants.sort(key=_variant_sort_key)
for variant in variants:
image_links = _product_image_links(request, product, variant)
# Skip variants without images as per spec requirement
if not image_links:
continue
serialized_products.append(_serialize_variant(request, product, variant))
return Response(
{
"api_version": TOROB_API_VERSION,
"current_page": 1,
"total": len(serialized_products),
"max_pages": max(1, math.ceil(len(serialized_products) / TOROB_PAGE_SIZE)),
"products": serialized_products,
},
status=status.HTTP_200_OK,
)
if "page_uniques" in data:
requested_uniques = [str(unique) for unique in data["page_uniques"]]
parsed_uniques = [_parse_page_unique(unique) for unique in requested_uniques]
product_ids = {product_id for product_id, _ in parsed_uniques if product_id}
variant_ids = {variant_id for _, variant_id in parsed_uniques if variant_id}
products = list(base_qs.filter(pk__in=product_ids))
product_by_id = {str(product.pk): product for product in products}
variant_map: dict[str, dict[str, ProductVariant]] = {}
for product in products:
variant_map[str(product.pk)] = {str(variant.pk): variant for variant in product.variants.all()}
serialized_products = []
seen = set()
for product_id, variant_id in parsed_uniques:
if not product_id:
continue
product = product_by_id.get(product_id)
if product is None:
continue
if variant_id:
variant = variant_map.get(product_id, {}).get(variant_id)
if not variant:
continue
image_links = _product_image_links(request, product, variant)
# Skip variants without images
if not image_links:
continue
page_unique = _variant_page_unique(product, variant)
if page_unique in seen:
continue
seen.add(page_unique)
serialized_products.append(_serialize_variant(request, product, variant))
continue
variants = list(product.variants.all())
if not variants:
continue
variants.sort(key=_variant_sort_key)
for variant in variants:
image_links = _product_image_links(request, product, variant)
# Skip variants without images
if not image_links:
continue
page_unique = _variant_page_unique(product, variant)
if page_unique in seen:
continue
seen.add(page_unique)
serialized_products.append(_serialize_variant(request, product, variant))
return Response(
{
"api_version": TOROB_API_VERSION,
"current_page": 1,
"total": len(serialized_products),
"max_pages": max(1, math.ceil(len(serialized_products) / TOROB_PAGE_SIZE)),
"products": serialized_products,
},
status=status.HTTP_200_OK,
)
sort = data["sort"]
page_number = data["page"]
variants_qs = (
ProductVariant.objects.select_related(
"product",
"product__category",
"product__category__parent",
"product__shop",
)
.prefetch_related("product_attributes__attribute_type", "images")
.filter(product__slug__isnull=False)
.exclude(product__slug="")
.annotate(
product_created_at=F("product__created_at"),
variant_updated_sort=Coalesce("updated_at", "created_at"),
)
)
if sort == "date_updated_desc":
variants_qs = variants_qs.order_by("-variant_updated_sort", "-pk")
else:
variants_qs = variants_qs.order_by("-created_at", "-pk")
paginator = Paginator(variants_qs, TOROB_PAGE_SIZE)
total = paginator.count
max_pages = max(1, math.ceil(total / TOROB_PAGE_SIZE))
try:
page_obj = paginator.page(page_number)
page_variants = list(page_obj.object_list)
except EmptyPage:
page_variants = []
serialized_products = [
_serialize_variant(request, variant.product, variant)
for variant in page_variants
]
return Response(
{
"api_version": TOROB_API_VERSION,
"current_page": page_number,
"total": total,
"max_pages": max_pages,
"products": serialized_products,
},
status=status.HTTP_200_OK,
)