import logging import os import secrets from hashlib import sha256 import ipaddress from dataclasses import dataclass from datetime import timedelta from django.conf import settings from django.contrib.auth.hashers import check_password, make_password from django.utils import timezone from django.utils.translation import gettext_lazy as _ from apps.accounts.models import OtpChannel, OtpPurpose, PhoneOTP logger = logging.getLogger(__name__) @dataclass class OtpSendResult: request_id: str expires_at: str class OtpRateLimitError(RuntimeError): def __init__(self, retry_after_seconds: int): super().__init__(_("Too many OTP requests. Try again later.")) self.retry_after_seconds = retry_after_seconds class OtpCooldownError(RuntimeError): def __init__(self, retry_after_seconds: int): super().__init__(_("Please wait before requesting another code.")) self.retry_after_seconds = retry_after_seconds class OtpIpRateLimitError(RuntimeError): def __init__(self, retry_after_seconds: int): super().__init__(_("Too many OTP requests from this IP. Try again later.")) self.retry_after_seconds = retry_after_seconds class OtpDeviceRateLimitError(RuntimeError): def __init__(self, retry_after_seconds: int): super().__init__(_("Too many OTP requests from this device. Try again later.")) self.retry_after_seconds = retry_after_seconds class BaseOtpProvider: uses_provider_otp = False def send_sms(self, to_number: str, message: str) -> None: raise NotImplementedError def send_whatsapp(self, to_number: str, message: str) -> None: raise NotImplementedError def send_otp(self, to_number: str, channel: str) -> None: raise NotImplementedError def verify_otp(self, to_number: str, code: str) -> bool: raise NotImplementedError class ConsoleOtpProvider(BaseOtpProvider): def send_sms(self, to_number: str, message: str) -> None: logger.info("OTP SMS to %s: %s", to_number, message) def send_whatsapp(self, to_number: str, message: str) -> None: logger.info("OTP WhatsApp to %s: %s", to_number, message) class AuthenticaOtpProvider(BaseOtpProvider): """Authentica provider for OTP delivery (SMS/WhatsApp) plus custom SMS messaging.""" uses_provider_otp = True def __init__(self) -> None: self.api_key = os.getenv("AUTHENTICA_API_KEY") self.base_url = os.getenv("AUTHENTICA_BASE_URL", "https://api.authentica.sa") self.timeout_seconds = float(os.getenv("AUTHENTICA_TIMEOUT_SECONDS", "10")) self.sender_name = os.getenv("AUTHENTICA_SENDER_NAME") def _assert_config(self) -> None: if not self.api_key: raise ValueError(_("Authentica API key is not configured")) def _headers(self) -> dict: return { "Accept": "application/json", "Content-Type": "application/json", "X-Authorization": self.api_key, } def _post(self, path: str, payload: dict) -> dict: import requests self._assert_config() base_url = self.base_url.rstrip("/") url = f"{base_url}{path}" try: response = requests.post( url, headers=self._headers(), json=payload, timeout=self.timeout_seconds, ) except requests.RequestException as exc: raise RuntimeError(_("Authentica request failed")) from exc try: data = response.json() except ValueError: data = {"detail": response.text} if not response.ok: if os.getenv("AUTHENTICA_DEBUG") == "1" or os.getenv("AUTHENTICA_E2E") == "1": raise RuntimeError( _("Authentica request failed: %(status)s %(body)s") % {"status": response.status_code, "body": response.text} ) raise RuntimeError(_("Authentica request failed")) return data def send_otp(self, to_number: str, channel: str) -> None: if channel not in (OtpChannel.SMS, OtpChannel.WHATSAPP): raise ValueError(_("Unsupported OTP channel")) method = "sms" if channel == OtpChannel.SMS else "whatsapp" self._post("/api/v2/send-otp", {"method": method, "phone": to_number}) def verify_otp(self, to_number: str, code: str) -> bool: data = self._post("/api/v2/verify-otp", {"phone": to_number, "otp": code}) if "verified" in data: verified = bool(data.get("verified")) else: verified = bool(data.get("status")) or data.get("message") == "OTP verified successfully" if not verified and (os.getenv("AUTHENTICA_DEBUG") == "1" or os.getenv("AUTHENTICA_E2E") == "1"): raise RuntimeError(_("Authentica verify failed: %(response)s") % {"response": data}) return verified def send_sms(self, to_number: str, message: str) -> None: if not self.sender_name: raise ValueError(_("Authentica sender name is not configured")) self._post( "/api/v2/send-sms", { "phone": to_number, "message": message, "sender_name": self.sender_name, }, ) def send_whatsapp(self, to_number: str, message: str) -> None: raise ValueError(_("Authentica WhatsApp messaging is not supported")) PROVIDERS = { "console": ConsoleOtpProvider, "authentica": AuthenticaOtpProvider, } def _get_provider_for_key(provider_key: str) -> BaseOtpProvider: provider_cls = PROVIDERS.get(provider_key) if not provider_cls: raise ValueError(_("Unknown OTP provider: %(provider)s") % {"provider": provider_key}) return provider_cls() def get_provider() -> BaseOtpProvider: return _get_provider_for_key(settings.OTP_PROVIDER) def generate_code(length: int = 6) -> str: digits = "0123456789" return "".join(secrets.choice(digits) for _ in range(length)) def _compute_retry_after_seconds(oldest_recent, now, window_minutes: int, floor_seconds: int = 1) -> int: if oldest_recent: retry_after = int((oldest_recent.created_at + timedelta(minutes=window_minutes) - now).total_seconds()) else: retry_after = floor_seconds return max(retry_after, floor_seconds) def normalize_request_ip(raw_ip: str | None) -> str | None: if not raw_ip: return None candidate = raw_ip.split(",")[0].strip() try: return str(ipaddress.ip_address(candidate)) except ValueError: return None def build_device_signal(device_id: str | None, user_agent: str | None, accept_language: str | None) -> str: # Prefer explicit device id; fallback to passive headers for coarse signal. source = (device_id or "").strip() if not source: source = f"{(user_agent or '').strip()}|{(accept_language or '').strip()}" if not source.strip("|"): return "" return f"sha256:{sha256(source.encode('utf-8')).hexdigest()[:24]}" def enforce_phone_auth_request_limits(request_ip: str | None, device_signal: str | None) -> None: now = timezone.now() window_minutes = getattr(settings, "PHONE_AUTH_RISK_WINDOW_MINUTES", 15) window_start = now - timedelta(minutes=window_minutes) ip_limit = getattr(settings, "PHONE_AUTH_IP_MAX_PER_WINDOW", 20) device_limit = getattr(settings, "PHONE_AUTH_DEVICE_MAX_PER_WINDOW", 20) if request_ip and ip_limit > 0: ip_recent = PhoneOTP.objects.filter( purpose=OtpPurpose.AUTH, request_ip=request_ip, created_at__gte=window_start, ) if ip_recent.count() >= ip_limit: oldest_recent = ip_recent.order_by("created_at").first() raise OtpIpRateLimitError( retry_after_seconds=_compute_retry_after_seconds(oldest_recent, now, window_minutes) ) if device_signal and device_limit > 0: device_recent = PhoneOTP.objects.filter( purpose=OtpPurpose.AUTH, device_signal=device_signal, created_at__gte=window_start, ) if device_recent.count() >= device_limit: oldest_recent = device_recent.order_by("created_at").first() raise OtpDeviceRateLimitError( retry_after_seconds=_compute_retry_after_seconds(oldest_recent, now, window_minutes) ) def create_and_send_otp( phone_number: str, channel: str, purpose: str = OtpPurpose.AUTH, request_ip: str | None = None, device_signal: str = "", ) -> OtpSendResult: provider = get_provider() now = timezone.now() window_minutes = getattr(settings, "OTP_WINDOW_MINUTES", 15) max_per_window = getattr(settings, "OTP_MAX_PER_WINDOW", 5) cooldown_seconds = getattr(settings, "OTP_RESEND_COOLDOWN_SECONDS", 60) window_start = now - timedelta(minutes=window_minutes) recent_qs = PhoneOTP.objects.filter(phone_number=phone_number, created_at__gte=window_start) if recent_qs.count() >= max_per_window: oldest_recent = recent_qs.order_by("created_at").first() raise OtpRateLimitError( retry_after_seconds=_compute_retry_after_seconds(oldest_recent, now, window_minutes, cooldown_seconds) ) latest = ( PhoneOTP.objects.filter(phone_number=phone_number, channel=channel) .order_by("-created_at") .first() ) if latest and latest.verified_at is None: elapsed = (now - latest.created_at).total_seconds() if elapsed < cooldown_seconds: raise OtpCooldownError(retry_after_seconds=int(cooldown_seconds - elapsed)) if provider.uses_provider_otp: code_hash = make_password(secrets.token_urlsafe(16)) message = None else: code = generate_code() code_hash = make_password(code) message = _( "Your verification code is %(code)s. It expires in %(minutes)s minutes." ) % {"code": code, "minutes": settings.OTP_EXPIRY_MINUTES} otp = PhoneOTP.objects.create( phone_number=phone_number, channel=channel, purpose=purpose, provider=settings.OTP_PROVIDER, code_hash=code_hash, expires_at=PhoneOTP.expiry_at(), request_ip=request_ip, device_signal=device_signal, ) if provider.uses_provider_otp: provider.send_otp(phone_number, channel) else: if channel == OtpChannel.SMS: provider.send_sms(phone_number, message) elif channel == OtpChannel.WHATSAPP: provider.send_whatsapp(phone_number, message) else: raise ValueError(_("Unsupported OTP channel")) return OtpSendResult(request_id=str(otp.id), expires_at=otp.expires_at.isoformat()) def verify_otp(otp: PhoneOTP, code: str) -> bool: if otp.verified_at or otp.is_expired(): return False otp.attempt_count += 1 if otp.attempt_count > otp.max_attempts: otp.save(update_fields=["attempt_count"]) return False provider_cls = PROVIDERS.get(otp.provider) if provider_cls and getattr(provider_cls, "uses_provider_otp", False): provider = provider_cls() try: verified = provider.verify_otp(otp.phone_number, code) except Exception: otp.save(update_fields=["attempt_count"]) raise if not verified: otp.save(update_fields=["attempt_count"]) return False else: if not check_password(code, otp.code_hash): otp.save(update_fields=["attempt_count"]) return False otp.verified_at = timezone.now() otp.save(update_fields=["verified_at", "attempt_count"]) return True