Files
2026-03-14 01:07:26 +03:00

333 lines
12 KiB
Python

import logging
import os
import secrets
from hashlib import sha256
import ipaddress
from dataclasses import dataclass
from datetime import timedelta
from django.conf import settings
from django.contrib.auth.hashers import check_password, make_password
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from apps.accounts.models import OtpChannel, OtpPurpose, PhoneOTP
logger = logging.getLogger(__name__)
@dataclass
class OtpSendResult:
request_id: str
expires_at: str
class OtpRateLimitError(RuntimeError):
def __init__(self, retry_after_seconds: int):
super().__init__(_("Too many OTP requests. Try again later."))
self.retry_after_seconds = retry_after_seconds
class OtpCooldownError(RuntimeError):
def __init__(self, retry_after_seconds: int):
super().__init__(_("Please wait before requesting another code."))
self.retry_after_seconds = retry_after_seconds
class OtpIpRateLimitError(RuntimeError):
def __init__(self, retry_after_seconds: int):
super().__init__(_("Too many OTP requests from this IP. Try again later."))
self.retry_after_seconds = retry_after_seconds
class OtpDeviceRateLimitError(RuntimeError):
def __init__(self, retry_after_seconds: int):
super().__init__(_("Too many OTP requests from this device. Try again later."))
self.retry_after_seconds = retry_after_seconds
class BaseOtpProvider:
uses_provider_otp = False
def send_sms(self, to_number: str, message: str) -> None:
raise NotImplementedError
def send_whatsapp(self, to_number: str, message: str) -> None:
raise NotImplementedError
def send_otp(self, to_number: str, channel: str) -> None:
raise NotImplementedError
def verify_otp(self, to_number: str, code: str) -> bool:
raise NotImplementedError
class ConsoleOtpProvider(BaseOtpProvider):
def send_sms(self, to_number: str, message: str) -> None:
logger.info("OTP SMS to %s: %s", to_number, message)
def send_whatsapp(self, to_number: str, message: str) -> None:
logger.info("OTP WhatsApp to %s: %s", to_number, message)
class AuthenticaOtpProvider(BaseOtpProvider):
"""Authentica provider for OTP delivery (SMS/WhatsApp) plus custom SMS messaging."""
uses_provider_otp = True
def __init__(self) -> None:
self.api_key = os.getenv("AUTHENTICA_API_KEY")
self.base_url = os.getenv("AUTHENTICA_BASE_URL", "https://api.authentica.sa")
self.timeout_seconds = float(os.getenv("AUTHENTICA_TIMEOUT_SECONDS", "10"))
self.sender_name = os.getenv("AUTHENTICA_SENDER_NAME")
def _assert_config(self) -> None:
if not self.api_key:
raise ValueError(_("Authentica API key is not configured"))
def _headers(self) -> dict:
return {
"Accept": "application/json",
"Content-Type": "application/json",
"X-Authorization": self.api_key,
}
def _post(self, path: str, payload: dict) -> dict:
import requests
self._assert_config()
base_url = self.base_url.rstrip("/")
url = f"{base_url}{path}"
try:
response = requests.post(
url,
headers=self._headers(),
json=payload,
timeout=self.timeout_seconds,
)
except requests.RequestException as exc:
raise RuntimeError(_("Authentica request failed")) from exc
try:
data = response.json()
except ValueError:
data = {"detail": response.text}
if not response.ok:
if os.getenv("AUTHENTICA_DEBUG") == "1" or os.getenv("AUTHENTICA_E2E") == "1":
raise RuntimeError(
_("Authentica request failed: %(status)s %(body)s")
% {"status": response.status_code, "body": response.text}
)
raise RuntimeError(_("Authentica request failed"))
return data
def send_otp(self, to_number: str, channel: str) -> None:
if channel not in (OtpChannel.SMS, OtpChannel.WHATSAPP):
raise ValueError(_("Unsupported OTP channel"))
method = "sms" if channel == OtpChannel.SMS else "whatsapp"
self._post("/api/v2/send-otp", {"method": method, "phone": to_number})
def verify_otp(self, to_number: str, code: str) -> bool:
data = self._post("/api/v2/verify-otp", {"phone": to_number, "otp": code})
if "verified" in data:
verified = bool(data.get("verified"))
else:
verified = bool(data.get("status")) or data.get("message") == "OTP verified successfully"
if not verified and (os.getenv("AUTHENTICA_DEBUG") == "1" or os.getenv("AUTHENTICA_E2E") == "1"):
raise RuntimeError(_("Authentica verify failed: %(response)s") % {"response": data})
return verified
def send_sms(self, to_number: str, message: str) -> None:
if not self.sender_name:
raise ValueError(_("Authentica sender name is not configured"))
self._post(
"/api/v2/send-sms",
{
"phone": to_number,
"message": message,
"sender_name": self.sender_name,
},
)
def send_whatsapp(self, to_number: str, message: str) -> None:
raise ValueError(_("Authentica WhatsApp messaging is not supported"))
PROVIDERS = {
"console": ConsoleOtpProvider,
"authentica": AuthenticaOtpProvider,
}
def _get_provider_for_key(provider_key: str) -> BaseOtpProvider:
provider_cls = PROVIDERS.get(provider_key)
if not provider_cls:
raise ValueError(_("Unknown OTP provider: %(provider)s") % {"provider": provider_key})
return provider_cls()
def get_provider() -> BaseOtpProvider:
return _get_provider_for_key(settings.OTP_PROVIDER)
def generate_code(length: int = 6) -> str:
digits = "0123456789"
return "".join(secrets.choice(digits) for _ in range(length))
def _compute_retry_after_seconds(oldest_recent, now, window_minutes: int, floor_seconds: int = 1) -> int:
if oldest_recent:
retry_after = int((oldest_recent.created_at + timedelta(minutes=window_minutes) - now).total_seconds())
else:
retry_after = floor_seconds
return max(retry_after, floor_seconds)
def normalize_request_ip(raw_ip: str | None) -> str | None:
if not raw_ip:
return None
candidate = raw_ip.split(",")[0].strip()
try:
return str(ipaddress.ip_address(candidate))
except ValueError:
return None
def build_device_signal(device_id: str | None, user_agent: str | None, accept_language: str | None) -> str:
# Prefer explicit device id; fallback to passive headers for coarse signal.
source = (device_id or "").strip()
if not source:
source = f"{(user_agent or '').strip()}|{(accept_language or '').strip()}"
if not source.strip("|"):
return ""
return f"sha256:{sha256(source.encode('utf-8')).hexdigest()[:24]}"
def enforce_phone_auth_request_limits(request_ip: str | None, device_signal: str | None) -> None:
now = timezone.now()
window_minutes = getattr(settings, "PHONE_AUTH_RISK_WINDOW_MINUTES", 15)
window_start = now - timedelta(minutes=window_minutes)
ip_limit = getattr(settings, "PHONE_AUTH_IP_MAX_PER_WINDOW", 20)
device_limit = getattr(settings, "PHONE_AUTH_DEVICE_MAX_PER_WINDOW", 20)
if request_ip and ip_limit > 0:
ip_recent = PhoneOTP.objects.filter(
purpose=OtpPurpose.AUTH,
request_ip=request_ip,
created_at__gte=window_start,
)
if ip_recent.count() >= ip_limit:
oldest_recent = ip_recent.order_by("created_at").first()
raise OtpIpRateLimitError(
retry_after_seconds=_compute_retry_after_seconds(oldest_recent, now, window_minutes)
)
if device_signal and device_limit > 0:
device_recent = PhoneOTP.objects.filter(
purpose=OtpPurpose.AUTH,
device_signal=device_signal,
created_at__gte=window_start,
)
if device_recent.count() >= device_limit:
oldest_recent = device_recent.order_by("created_at").first()
raise OtpDeviceRateLimitError(
retry_after_seconds=_compute_retry_after_seconds(oldest_recent, now, window_minutes)
)
def create_and_send_otp(
phone_number: str,
channel: str,
purpose: str = OtpPurpose.AUTH,
request_ip: str | None = None,
device_signal: str = "",
) -> OtpSendResult:
provider = get_provider()
now = timezone.now()
window_minutes = getattr(settings, "OTP_WINDOW_MINUTES", 15)
max_per_window = getattr(settings, "OTP_MAX_PER_WINDOW", 5)
cooldown_seconds = getattr(settings, "OTP_RESEND_COOLDOWN_SECONDS", 60)
window_start = now - timedelta(minutes=window_minutes)
recent_qs = PhoneOTP.objects.filter(phone_number=phone_number, created_at__gte=window_start)
if recent_qs.count() >= max_per_window:
oldest_recent = recent_qs.order_by("created_at").first()
raise OtpRateLimitError(
retry_after_seconds=_compute_retry_after_seconds(oldest_recent, now, window_minutes, cooldown_seconds)
)
latest = (
PhoneOTP.objects.filter(phone_number=phone_number, channel=channel)
.order_by("-created_at")
.first()
)
if latest and latest.verified_at is None:
elapsed = (now - latest.created_at).total_seconds()
if elapsed < cooldown_seconds:
raise OtpCooldownError(retry_after_seconds=int(cooldown_seconds - elapsed))
if provider.uses_provider_otp:
code_hash = make_password(secrets.token_urlsafe(16))
message = None
else:
code = generate_code()
code_hash = make_password(code)
message = _(
"Your verification code is %(code)s. It expires in %(minutes)s minutes."
) % {"code": code, "minutes": settings.OTP_EXPIRY_MINUTES}
otp = PhoneOTP.objects.create(
phone_number=phone_number,
channel=channel,
purpose=purpose,
provider=settings.OTP_PROVIDER,
code_hash=code_hash,
expires_at=PhoneOTP.expiry_at(),
request_ip=request_ip,
device_signal=device_signal,
)
if provider.uses_provider_otp:
provider.send_otp(phone_number, channel)
else:
if channel == OtpChannel.SMS:
provider.send_sms(phone_number, message)
elif channel == OtpChannel.WHATSAPP:
provider.send_whatsapp(phone_number, message)
else:
raise ValueError(_("Unsupported OTP channel"))
return OtpSendResult(request_id=str(otp.id), expires_at=otp.expires_at.isoformat())
def verify_otp(otp: PhoneOTP, code: str) -> bool:
if otp.verified_at or otp.is_expired():
return False
otp.attempt_count += 1
if otp.attempt_count > otp.max_attempts:
otp.save(update_fields=["attempt_count"])
return False
provider_cls = PROVIDERS.get(otp.provider)
if provider_cls and getattr(provider_cls, "uses_provider_otp", False):
provider = provider_cls()
try:
verified = provider.verify_otp(otp.phone_number, code)
except Exception:
otp.save(update_fields=["attempt_count"])
raise
if not verified:
otp.save(update_fields=["attempt_count"])
return False
else:
if not check_password(code, otp.code_hash):
otp.save(update_fields=["attempt_count"])
return False
otp.verified_at = timezone.now()
otp.save(update_fields=["verified_at", "attempt_count"])
return True