253 lines
8.7 KiB
Python
253 lines
8.7 KiB
Python
import logging
|
|
import os
|
|
import secrets
|
|
from dataclasses import dataclass
|
|
from datetime import timedelta
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth.hashers import check_password, make_password
|
|
from django.utils import timezone
|
|
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
from apps.accounts.models import OtpChannel, OtpPurpose, PhoneOTP
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class OtpSendResult:
|
|
request_id: str
|
|
expires_at: str
|
|
|
|
|
|
class OtpRateLimitError(RuntimeError):
|
|
def __init__(self, retry_after_seconds: int):
|
|
super().__init__(_("Too many OTP requests. Try again later."))
|
|
self.retry_after_seconds = retry_after_seconds
|
|
|
|
|
|
class OtpCooldownError(RuntimeError):
|
|
def __init__(self, retry_after_seconds: int):
|
|
super().__init__(_("Please wait before requesting another code."))
|
|
self.retry_after_seconds = retry_after_seconds
|
|
|
|
|
|
class BaseOtpProvider:
|
|
uses_provider_otp = False
|
|
|
|
def send_sms(self, to_number: str, message: str) -> None:
|
|
raise NotImplementedError
|
|
|
|
def send_whatsapp(self, to_number: str, message: str) -> None:
|
|
raise NotImplementedError
|
|
|
|
def send_otp(self, to_number: str, channel: str) -> None:
|
|
raise NotImplementedError
|
|
|
|
def verify_otp(self, to_number: str, code: str) -> bool:
|
|
raise NotImplementedError
|
|
|
|
|
|
class ConsoleOtpProvider(BaseOtpProvider):
|
|
def send_sms(self, to_number: str, message: str) -> None:
|
|
logger.info("OTP SMS to %s: %s", to_number, message)
|
|
|
|
def send_whatsapp(self, to_number: str, message: str) -> None:
|
|
logger.info("OTP WhatsApp to %s: %s", to_number, message)
|
|
|
|
|
|
|
|
|
|
class AuthenticaOtpProvider(BaseOtpProvider):
|
|
"""Authentica provider for OTP delivery (SMS/WhatsApp) plus custom SMS messaging."""
|
|
|
|
uses_provider_otp = True
|
|
|
|
def __init__(self) -> None:
|
|
self.api_key = os.getenv("AUTHENTICA_API_KEY")
|
|
self.base_url = os.getenv("AUTHENTICA_BASE_URL", "https://api.authentica.sa")
|
|
self.timeout_seconds = float(os.getenv("AUTHENTICA_TIMEOUT_SECONDS", "10"))
|
|
self.sender_name = os.getenv("AUTHENTICA_SENDER_NAME")
|
|
|
|
def _assert_config(self) -> None:
|
|
if not self.api_key:
|
|
raise ValueError(_("Authentica API key is not configured"))
|
|
|
|
def _headers(self) -> dict:
|
|
return {
|
|
"Accept": "application/json",
|
|
"Content-Type": "application/json",
|
|
"X-Authorization": self.api_key,
|
|
}
|
|
|
|
def _post(self, path: str, payload: dict) -> dict:
|
|
import requests
|
|
|
|
self._assert_config()
|
|
base_url = self.base_url.rstrip("/")
|
|
url = f"{base_url}{path}"
|
|
try:
|
|
response = requests.post(
|
|
url,
|
|
headers=self._headers(),
|
|
json=payload,
|
|
timeout=self.timeout_seconds,
|
|
)
|
|
except requests.RequestException as exc:
|
|
raise RuntimeError(_("Authentica request failed")) from exc
|
|
|
|
try:
|
|
data = response.json()
|
|
except ValueError:
|
|
data = {"detail": response.text}
|
|
|
|
if not response.ok:
|
|
if os.getenv("AUTHENTICA_DEBUG") == "1" or os.getenv("AUTHENTICA_E2E") == "1":
|
|
raise RuntimeError(
|
|
_("Authentica request failed: %(status)s %(body)s")
|
|
% {"status": response.status_code, "body": response.text}
|
|
)
|
|
raise RuntimeError(_("Authentica request failed"))
|
|
|
|
return data
|
|
|
|
def send_otp(self, to_number: str, channel: str) -> None:
|
|
if channel not in (OtpChannel.SMS, OtpChannel.WHATSAPP):
|
|
raise ValueError(_("Unsupported OTP channel"))
|
|
method = "sms" if channel == OtpChannel.SMS else "whatsapp"
|
|
self._post("/api/v2/send-otp", {"method": method, "phone": to_number})
|
|
|
|
def verify_otp(self, to_number: str, code: str) -> bool:
|
|
data = self._post("/api/v2/verify-otp", {"phone": to_number, "otp": code})
|
|
if "verified" in data:
|
|
verified = bool(data.get("verified"))
|
|
else:
|
|
verified = bool(data.get("status")) or data.get("message") == "OTP verified successfully"
|
|
if not verified and (os.getenv("AUTHENTICA_DEBUG") == "1" or os.getenv("AUTHENTICA_E2E") == "1"):
|
|
raise RuntimeError(_("Authentica verify failed: %(response)s") % {"response": data})
|
|
return verified
|
|
|
|
def send_sms(self, to_number: str, message: str) -> None:
|
|
if not self.sender_name:
|
|
raise ValueError(_("Authentica sender name is not configured"))
|
|
self._post(
|
|
"/api/v2/send-sms",
|
|
{
|
|
"phone": to_number,
|
|
"message": message,
|
|
"sender_name": self.sender_name,
|
|
},
|
|
)
|
|
|
|
def send_whatsapp(self, to_number: str, message: str) -> None:
|
|
raise ValueError(_("Authentica WhatsApp messaging is not supported"))
|
|
|
|
|
|
PROVIDERS = {
|
|
"console": ConsoleOtpProvider,
|
|
"authentica": AuthenticaOtpProvider,
|
|
}
|
|
|
|
|
|
def _get_provider_for_key(provider_key: str) -> BaseOtpProvider:
|
|
provider_cls = PROVIDERS.get(provider_key)
|
|
if not provider_cls:
|
|
raise ValueError(_("Unknown OTP provider: %(provider)s") % {"provider": provider_key})
|
|
return provider_cls()
|
|
|
|
|
|
def get_provider() -> BaseOtpProvider:
|
|
return _get_provider_for_key(settings.OTP_PROVIDER)
|
|
|
|
|
|
def generate_code(length: int = 6) -> str:
|
|
digits = "0123456789"
|
|
return "".join(secrets.choice(digits) for _ in range(length))
|
|
|
|
|
|
def create_and_send_otp(phone_number: str, channel: str, purpose: str = OtpPurpose.AUTH) -> OtpSendResult:
|
|
provider = get_provider()
|
|
now = timezone.now()
|
|
window_minutes = getattr(settings, "OTP_WINDOW_MINUTES", 15)
|
|
max_per_window = getattr(settings, "OTP_MAX_PER_WINDOW", 5)
|
|
cooldown_seconds = getattr(settings, "OTP_RESEND_COOLDOWN_SECONDS", 60)
|
|
|
|
window_start = now - timedelta(minutes=window_minutes)
|
|
recent_qs = PhoneOTP.objects.filter(phone_number=phone_number, created_at__gte=window_start)
|
|
if recent_qs.count() >= max_per_window:
|
|
oldest_recent = recent_qs.order_by("created_at").first()
|
|
if oldest_recent:
|
|
retry_after = int((oldest_recent.created_at + timedelta(minutes=window_minutes) - now).total_seconds())
|
|
else:
|
|
retry_after = cooldown_seconds
|
|
raise OtpRateLimitError(retry_after_seconds=max(retry_after, cooldown_seconds))
|
|
|
|
latest = (
|
|
PhoneOTP.objects.filter(phone_number=phone_number, channel=channel)
|
|
.order_by("-created_at")
|
|
.first()
|
|
)
|
|
if latest and latest.verified_at is None:
|
|
elapsed = (now - latest.created_at).total_seconds()
|
|
if elapsed < cooldown_seconds:
|
|
raise OtpCooldownError(retry_after_seconds=int(cooldown_seconds - elapsed))
|
|
|
|
if provider.uses_provider_otp:
|
|
code_hash = make_password(secrets.token_urlsafe(16))
|
|
message = None
|
|
else:
|
|
code = generate_code()
|
|
code_hash = make_password(code)
|
|
message = _(
|
|
"Your verification code is %(code)s. It expires in %(minutes)s minutes."
|
|
) % {"code": code, "minutes": settings.OTP_EXPIRY_MINUTES}
|
|
|
|
otp = PhoneOTP.objects.create(
|
|
phone_number=phone_number,
|
|
channel=channel,
|
|
purpose=purpose,
|
|
provider=settings.OTP_PROVIDER,
|
|
code_hash=code_hash,
|
|
expires_at=PhoneOTP.expiry_at(),
|
|
)
|
|
|
|
if provider.uses_provider_otp:
|
|
provider.send_otp(phone_number, channel)
|
|
else:
|
|
if channel == OtpChannel.SMS:
|
|
provider.send_sms(phone_number, message)
|
|
elif channel == OtpChannel.WHATSAPP:
|
|
provider.send_whatsapp(phone_number, message)
|
|
else:
|
|
raise ValueError(_("Unsupported OTP channel"))
|
|
|
|
return OtpSendResult(request_id=str(otp.id), expires_at=otp.expires_at.isoformat())
|
|
|
|
|
|
def verify_otp(otp: PhoneOTP, code: str) -> bool:
|
|
if otp.verified_at or otp.is_expired():
|
|
return False
|
|
otp.attempt_count += 1
|
|
if otp.attempt_count > otp.max_attempts:
|
|
otp.save(update_fields=["attempt_count"])
|
|
return False
|
|
provider_cls = PROVIDERS.get(otp.provider)
|
|
if provider_cls and getattr(provider_cls, "uses_provider_otp", False):
|
|
provider = provider_cls()
|
|
try:
|
|
verified = provider.verify_otp(otp.phone_number, code)
|
|
except Exception:
|
|
otp.save(update_fields=["attempt_count"])
|
|
raise
|
|
if not verified:
|
|
otp.save(update_fields=["attempt_count"])
|
|
return False
|
|
else:
|
|
if not check_password(code, otp.code_hash):
|
|
otp.save(update_fields=["attempt_count"])
|
|
return False
|
|
otp.verified_at = timezone.now()
|
|
otp.save(update_fields=["verified_at", "attempt_count"])
|
|
return True
|