from celery import shared_task from django.conf import settings from django.db.models import Prefetch import requests import logging import time from urllib.parse import urlparse from product.models import DollorModel, ProductVariant, ProductModel logger = logging.getLogger(__name__) TOROB_WEBHOOK_BATCH_SIZE = 100 TOROB_WEBHOOK_MIN_INTERVAL_SECONDS = 3.1 TOROB_WEBHOOK_MAX_RETRIES = 3 def _shop_product_url(product: ProductModel) -> str: domain = getattr(settings, "DOMAIN", None) or getattr(settings, "API_DOMAIN", None) or "" if domain.startswith("http://") or domain.startswith("https://"): base = domain.rstrip("/") else: base = f"https://{domain}".rstrip("/") if domain else "" return f"{base}/product/{product.slug}/" def _variant_page_unique(product: ProductModel, variant: ProductVariant) -> str: return f"{product.pk}_{variant.pk}" def _chunks(items, size): for idx in range(0, len(items), size): yield items[idx: idx + size] def _post_webhook_batch(url: str, headers: dict, batch_items: list[dict]) -> bool: payload = {"items": batch_items} backoff_seconds = 1.0 for attempt in range(1, TOROB_WEBHOOK_MAX_RETRIES + 1): try: response = requests.post(url, json=payload, headers=headers, timeout=10) if response.status_code < 400: return True if response.status_code in (429, 500, 502, 503, 504): logger.warning( "Torob webhook transient failure (%s), attempt %s/%s", response.status_code, attempt, TOROB_WEBHOOK_MAX_RETRIES, ) if attempt < TOROB_WEBHOOK_MAX_RETRIES: time.sleep(backoff_seconds) backoff_seconds *= 2 continue logger.error( "Torob webhook failed with status %s: %s", response.status_code, response.text, ) return False except requests.RequestException as exc: logger.warning( "Torob webhook request exception on attempt %s/%s: %s", attempt, TOROB_WEBHOOK_MAX_RETRIES, exc, ) if attempt < TOROB_WEBHOOK_MAX_RETRIES: time.sleep(backoff_seconds) backoff_seconds *= 2 continue return False return False @shared_task def update_prices(): # update dollor dollor_object, _ = DollorModel.objects.get_or_create(unique_filed='unique') dollor_object.update_price() dollor_object.save() dollor_price = dollor_object.price products = list(ProductVariant.objects.all()) for product in products: product.set_or_update_price(dollor_price=dollor_price) ProductVariant.objects.bulk_update(products, ['price'], batch_size=1000) @shared_task def send_torob_product_webhook(product_ids): if not settings.TOROB_PRODUCT_WEBHOOK_TOKEN: return {"sent": 0, "reason": "missing token"} products = ( ProductModel.objects.filter(id__in=product_ids) .select_related("category", "shop") .prefetch_related( Prefetch( "variants", queryset=ProductVariant.objects.prefetch_related( "images", "product_attributes__attribute_type" ).order_by("-discount", "price", "-created_at"), ) ) ) url = settings.TOROB_PRODUCT_WEBHOOK_URL headers = { "Content-Type": "application/json", "Authorization": f"Bearer {settings.TOROB_PRODUCT_WEBHOOK_TOKEN}", } items = [] hosts = set() for product in products: if not product.slug: continue page_url = _shop_product_url(product) parsed = urlparse(page_url) if not (parsed.scheme in {"http", "https"} and parsed.netloc): logger.warning("Skipping product %s due to invalid page_url: %s", product.pk, page_url) continue hosts.add(parsed.netloc.lower()) variants = list(product.variants.all()) if not variants: continue for variant in variants: # Validate variant has images before sending to Torob # Per spec: image_links is required, so skip variants without images images = list(variant.images.all()) has_product_image = bool(product.image) has_variant_images = bool(images) if not (has_product_image or has_variant_images): logger.debug(f"Skipping variant {variant.pk} for product {product.pk} - no images available") continue items.append( { "page_url": page_url, "page_unique": _variant_page_unique(product, variant), } ) if not items: return {"sent": 0, "reason": "no items"} if len(hosts) > 1: logger.error("Torob webhook items contain mixed domains: %s", sorted(hosts)) return {"sent": 0, "reason": "mixed domains"} sent = 0 for batch in _chunks(items, TOROB_WEBHOOK_BATCH_SIZE): ok = _post_webhook_batch(url=url, headers=headers, batch_items=batch) if ok: sent += len(batch) time.sleep(TOROB_WEBHOOK_MIN_INTERVAL_SECONDS) return {"sent": sent, "total": len(items)}