Files
hossein-por-shop/backend/product/tasks.py
Parsa Nazer 337e0723a8 torob
2026-05-18 14:26:00 +03:30

170 lines
5.4 KiB
Python

from celery import shared_task
from django.conf import settings
from django.db.models import Prefetch
import requests
import logging
import time
from urllib.parse import urlparse
from product.models import DollorModel, ProductVariant, ProductModel
logger = logging.getLogger(__name__)
TOROB_WEBHOOK_BATCH_SIZE = 100
TOROB_WEBHOOK_MIN_INTERVAL_SECONDS = 3.1
TOROB_WEBHOOK_MAX_RETRIES = 3
def _shop_product_url(product: ProductModel) -> str:
domain = getattr(settings, "DOMAIN", None) or getattr(settings, "API_DOMAIN", None) or ""
if domain.startswith("http://") or domain.startswith("https://"):
base = domain.rstrip("/")
else:
base = f"https://{domain}".rstrip("/") if domain else ""
return f"{base}/product/{product.slug}/"
def _variant_page_unique(product: ProductModel, variant: ProductVariant) -> str:
return f"{product.pk}_{variant.pk}"
def _chunks(items, size):
for idx in range(0, len(items), size):
yield items[idx: idx + size]
def _post_webhook_batch(url: str, headers: dict, batch_items: list[dict]) -> bool:
payload = {"items": batch_items}
backoff_seconds = 1.0
for attempt in range(1, TOROB_WEBHOOK_MAX_RETRIES + 1):
try:
response = requests.post(url, json=payload, headers=headers, timeout=10)
if response.status_code < 400:
return True
if response.status_code in (429, 500, 502, 503, 504):
logger.warning(
"Torob webhook transient failure (%s), attempt %s/%s",
response.status_code,
attempt,
TOROB_WEBHOOK_MAX_RETRIES,
)
if attempt < TOROB_WEBHOOK_MAX_RETRIES:
time.sleep(backoff_seconds)
backoff_seconds *= 2
continue
logger.error(
"Torob webhook failed with status %s: %s",
response.status_code,
response.text,
)
return False
except requests.RequestException as exc:
logger.warning(
"Torob webhook request exception on attempt %s/%s: %s",
attempt,
TOROB_WEBHOOK_MAX_RETRIES,
exc,
)
if attempt < TOROB_WEBHOOK_MAX_RETRIES:
time.sleep(backoff_seconds)
backoff_seconds *= 2
continue
return False
return False
@shared_task
def update_prices():
# update dollor
dollor_object, _ = DollorModel.objects.get_or_create(unique_filed='unique')
dollor_object.update_price()
dollor_object.save()
dollor_price = dollor_object.price
products = list(ProductVariant.objects.all())
for product in products:
product.set_or_update_price(dollor_price=dollor_price)
ProductVariant.objects.bulk_update(products, ['price'], batch_size=1000)
@shared_task
def send_torob_product_webhook(product_ids):
if not settings.TOROB_PRODUCT_WEBHOOK_TOKEN:
return {"sent": 0, "reason": "missing token"}
products = (
ProductModel.objects.filter(id__in=product_ids)
.select_related("category", "shop")
.prefetch_related(
Prefetch(
"variants",
queryset=ProductVariant.objects.prefetch_related(
"images",
"product_attributes__attribute_type"
).order_by("-discount", "price", "-created_at"),
)
)
)
url = settings.TOROB_PRODUCT_WEBHOOK_URL
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {settings.TOROB_PRODUCT_WEBHOOK_TOKEN}",
}
items = []
hosts = set()
for product in products:
if not product.slug:
continue
page_url = _shop_product_url(product)
parsed = urlparse(page_url)
if not (parsed.scheme in {"http", "https"} and parsed.netloc):
logger.warning("Skipping product %s due to invalid page_url: %s", product.pk, page_url)
continue
hosts.add(parsed.netloc.lower())
variants = list(product.variants.all())
if not variants:
continue
for variant in variants:
# Validate variant has images before sending to Torob
# Per spec: image_links is required, so skip variants without images
images = list(variant.images.all())
has_product_image = bool(product.image)
has_variant_images = bool(images)
if not (has_product_image or has_variant_images):
logger.debug(f"Skipping variant {variant.pk} for product {product.pk} - no images available")
continue
items.append(
{
"page_url": page_url,
"page_unique": _variant_page_unique(product, variant),
}
)
if not items:
return {"sent": 0, "reason": "no items"}
if len(hosts) > 1:
logger.error("Torob webhook items contain mixed domains: %s", sorted(hosts))
return {"sent": 0, "reason": "mixed domains"}
sent = 0
for batch in _chunks(items, TOROB_WEBHOOK_BATCH_SIZE):
ok = _post_webhook_batch(url=url, headers=headers, batch_items=batch)
if ok:
sent += len(batch)
time.sleep(TOROB_WEBHOOK_MIN_INTERVAL_SECONDS)
return {"sent": sent, "total": len(items)}