Files
Salon/backend/apps/accounts/services/otp.py
T
mohd 46af911a06 I proceeded with Phase 1 step 1 (phone-first auth + OTP hardening) and updated the plan context. Here’s the current state and the next steps.
What I implemented

Phone-first auth endpoints with OTP → JWT issuance.
views.py
urls.py
Phone normalization (KSA-focused, E.164 or Saudi mobile) and validation.
phone.py
serializers.py
OTP protections: rate limit, resend cooldown, attempt counting.
otp.py
models.py
Email is now optional to allow phone-only users.
models.py
0002_phone_auth_fields.py
Admin OTP visibility improved.
admin.py
Risks updated.
risks.md
2026-02-27 15:57:09 +03:00

174 lines
6.1 KiB
Python

import logging
import os
import secrets
from datetime import timedelta
from dataclasses import dataclass
from django.conf import settings
from django.contrib.auth.hashers import check_password, make_password
from django.utils import timezone
from apps.accounts.models import OtpChannel, OtpPurpose, PhoneOTP
logger = logging.getLogger(__name__)
@dataclass
class OtpSendResult:
request_id: str
expires_at: str
class OtpRateLimitError(RuntimeError):
def __init__(self, retry_after_seconds: int):
super().__init__("Too many OTP requests. Try again later.")
self.retry_after_seconds = retry_after_seconds
class OtpCooldownError(RuntimeError):
def __init__(self, retry_after_seconds: int):
super().__init__("Please wait before requesting another code.")
self.retry_after_seconds = retry_after_seconds
class BaseOtpProvider:
def send_sms(self, to_number: str, message: str) -> None:
raise NotImplementedError
def send_whatsapp(self, to_number: str, message: str) -> None:
raise NotImplementedError
class ConsoleOtpProvider(BaseOtpProvider):
def send_sms(self, to_number: str, message: str) -> None:
logger.info("OTP SMS to %s: %s", to_number, message)
def send_whatsapp(self, to_number: str, message: str) -> None:
logger.info("OTP WhatsApp to %s: %s", to_number, message)
class TwilioOtpProvider(BaseOtpProvider):
def __init__(self) -> None:
self.account_sid = os.getenv("TWILIO_ACCOUNT_SID")
self.auth_token = os.getenv("TWILIO_AUTH_TOKEN")
self.from_number = os.getenv("TWILIO_FROM_NUMBER")
self.whatsapp_from = os.getenv("TWILIO_WHATSAPP_FROM")
def _assert_config(self) -> None:
if not self.account_sid or not self.auth_token or not self.from_number:
raise ValueError("Twilio credentials are not configured")
def send_sms(self, to_number: str, message: str) -> None:
self._assert_config()
raise NotImplementedError("Twilio SMS adapter not implemented yet")
def send_whatsapp(self, to_number: str, message: str) -> None:
self._assert_config()
if not self.whatsapp_from:
raise ValueError("Twilio WhatsApp sender is not configured")
raise NotImplementedError("Twilio WhatsApp adapter not implemented yet")
class UnifonicOtpProvider(BaseOtpProvider):
def __init__(self) -> None:
self.app_sid = os.getenv("UNIFONIC_APP_SID")
self.sender_id = os.getenv("UNIFONIC_SENDER_ID")
self.whatsapp_sender = os.getenv("UNIFONIC_WHATSAPP_SENDER")
def _assert_config(self) -> None:
if not self.app_sid or not self.sender_id:
raise ValueError("Unifonic credentials are not configured")
def send_sms(self, to_number: str, message: str) -> None:
self._assert_config()
raise NotImplementedError("Unifonic SMS adapter not implemented yet")
def send_whatsapp(self, to_number: str, message: str) -> None:
self._assert_config()
if not self.whatsapp_sender:
raise ValueError("Unifonic WhatsApp sender is not configured")
raise NotImplementedError("Unifonic WhatsApp adapter not implemented yet")
PROVIDERS = {
"console": ConsoleOtpProvider,
"twilio": TwilioOtpProvider,
"unifonic": UnifonicOtpProvider,
}
def get_provider() -> BaseOtpProvider:
provider_key = settings.OTP_PROVIDER
provider_cls = PROVIDERS.get(provider_key)
if not provider_cls:
raise ValueError(f"Unknown OTP provider: {provider_key}")
return provider_cls()
def generate_code(length: int = 6) -> str:
digits = "0123456789"
return "".join(secrets.choice(digits) for _ in range(length))
def create_and_send_otp(phone_number: str, channel: str, purpose: str = OtpPurpose.AUTH) -> OtpSendResult:
provider = get_provider()
now = timezone.now()
window_minutes = getattr(settings, "OTP_WINDOW_MINUTES", 15)
max_per_window = getattr(settings, "OTP_MAX_PER_WINDOW", 5)
cooldown_seconds = getattr(settings, "OTP_RESEND_COOLDOWN_SECONDS", 60)
window_start = now - timedelta(minutes=window_minutes)
recent_qs = PhoneOTP.objects.filter(phone_number=phone_number, created_at__gte=window_start)
if recent_qs.count() >= max_per_window:
oldest_recent = recent_qs.order_by("created_at").first()
if oldest_recent:
retry_after = int((oldest_recent.created_at + timedelta(minutes=window_minutes) - now).total_seconds())
else:
retry_after = cooldown_seconds
raise OtpRateLimitError(retry_after_seconds=max(retry_after, cooldown_seconds))
latest = (
PhoneOTP.objects.filter(phone_number=phone_number, channel=channel)
.order_by("-created_at")
.first()
)
if latest and latest.verified_at is None:
elapsed = (now - latest.created_at).total_seconds()
if elapsed < cooldown_seconds:
raise OtpCooldownError(retry_after_seconds=int(cooldown_seconds - elapsed))
code = generate_code()
otp = PhoneOTP.objects.create(
phone_number=phone_number,
channel=channel,
purpose=purpose,
provider=settings.OTP_PROVIDER,
code_hash=make_password(code),
expires_at=PhoneOTP.expiry_at(),
)
message = f"Your verification code is {code}. It expires in {settings.OTP_EXPIRY_MINUTES} minutes."
if channel == OtpChannel.SMS:
provider.send_sms(phone_number, message)
elif channel == OtpChannel.WHATSAPP:
provider.send_whatsapp(phone_number, message)
else:
raise ValueError("Unsupported OTP channel")
return OtpSendResult(request_id=str(otp.id), expires_at=otp.expires_at.isoformat())
def verify_otp(otp: PhoneOTP, code: str) -> bool:
if otp.verified_at or otp.is_expired():
return False
otp.attempt_count += 1
if otp.attempt_count > otp.max_attempts:
otp.save(update_fields=["attempt_count"])
return False
if not check_password(code, otp.code_hash):
otp.save(update_fields=["attempt_count"])
return False
otp.verified_at = timezone.now()
otp.save(update_fields=["verified_at", "attempt_count"])
return True