I proceeded with Phase 1 step 1 (phone-first auth + OTP hardening) and updated the plan context. Here’s the current state and the next steps.
What I implemented Phone-first auth endpoints with OTP → JWT issuance. views.py urls.py Phone normalization (KSA-focused, E.164 or Saudi mobile) and validation. phone.py serializers.py OTP protections: rate limit, resend cooldown, attempt counting. otp.py models.py Email is now optional to allow phone-only users. models.py 0002_phone_auth_fields.py Admin OTP visibility improved. admin.py Risks updated. risks.md
This commit is contained in:
@@ -1,13 +1,14 @@
|
||||
import logging
|
||||
import os
|
||||
import secrets
|
||||
from datetime import timedelta
|
||||
from dataclasses import dataclass
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.hashers import check_password, make_password
|
||||
from django.utils import timezone
|
||||
|
||||
from apps.accounts.models import OtpChannel, PhoneOTP
|
||||
from apps.accounts.models import OtpChannel, OtpPurpose, PhoneOTP
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -18,6 +19,18 @@ class OtpSendResult:
|
||||
expires_at: str
|
||||
|
||||
|
||||
class OtpRateLimitError(RuntimeError):
|
||||
def __init__(self, retry_after_seconds: int):
|
||||
super().__init__("Too many OTP requests. Try again later.")
|
||||
self.retry_after_seconds = retry_after_seconds
|
||||
|
||||
|
||||
class OtpCooldownError(RuntimeError):
|
||||
def __init__(self, retry_after_seconds: int):
|
||||
super().__init__("Please wait before requesting another code.")
|
||||
self.retry_after_seconds = retry_after_seconds
|
||||
|
||||
|
||||
class BaseOtpProvider:
|
||||
def send_sms(self, to_number: str, message: str) -> None:
|
||||
raise NotImplementedError
|
||||
@@ -97,12 +110,38 @@ def generate_code(length: int = 6) -> str:
|
||||
return "".join(secrets.choice(digits) for _ in range(length))
|
||||
|
||||
|
||||
def create_and_send_otp(phone_number: str, channel: str) -> OtpSendResult:
|
||||
def create_and_send_otp(phone_number: str, channel: str, purpose: str = OtpPurpose.AUTH) -> OtpSendResult:
|
||||
provider = get_provider()
|
||||
now = timezone.now()
|
||||
window_minutes = getattr(settings, "OTP_WINDOW_MINUTES", 15)
|
||||
max_per_window = getattr(settings, "OTP_MAX_PER_WINDOW", 5)
|
||||
cooldown_seconds = getattr(settings, "OTP_RESEND_COOLDOWN_SECONDS", 60)
|
||||
|
||||
window_start = now - timedelta(minutes=window_minutes)
|
||||
recent_qs = PhoneOTP.objects.filter(phone_number=phone_number, created_at__gte=window_start)
|
||||
if recent_qs.count() >= max_per_window:
|
||||
oldest_recent = recent_qs.order_by("created_at").first()
|
||||
if oldest_recent:
|
||||
retry_after = int((oldest_recent.created_at + timedelta(minutes=window_minutes) - now).total_seconds())
|
||||
else:
|
||||
retry_after = cooldown_seconds
|
||||
raise OtpRateLimitError(retry_after_seconds=max(retry_after, cooldown_seconds))
|
||||
|
||||
latest = (
|
||||
PhoneOTP.objects.filter(phone_number=phone_number, channel=channel)
|
||||
.order_by("-created_at")
|
||||
.first()
|
||||
)
|
||||
if latest and latest.verified_at is None:
|
||||
elapsed = (now - latest.created_at).total_seconds()
|
||||
if elapsed < cooldown_seconds:
|
||||
raise OtpCooldownError(retry_after_seconds=int(cooldown_seconds - elapsed))
|
||||
|
||||
code = generate_code()
|
||||
otp = PhoneOTP.objects.create(
|
||||
phone_number=phone_number,
|
||||
channel=channel,
|
||||
purpose=purpose,
|
||||
provider=settings.OTP_PROVIDER,
|
||||
code_hash=make_password(code),
|
||||
expires_at=PhoneOTP.expiry_at(),
|
||||
@@ -122,8 +161,13 @@ def create_and_send_otp(phone_number: str, channel: str) -> OtpSendResult:
|
||||
def verify_otp(otp: PhoneOTP, code: str) -> bool:
|
||||
if otp.verified_at or otp.is_expired():
|
||||
return False
|
||||
otp.attempt_count += 1
|
||||
if otp.attempt_count > otp.max_attempts:
|
||||
otp.save(update_fields=["attempt_count"])
|
||||
return False
|
||||
if not check_password(code, otp.code_hash):
|
||||
otp.save(update_fields=["attempt_count"])
|
||||
return False
|
||||
otp.verified_at = timezone.now()
|
||||
otp.save(update_fields=["verified_at"])
|
||||
otp.save(update_fields=["verified_at", "attempt_count"])
|
||||
return True
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
import re
|
||||
|
||||
|
||||
def normalize_phone_number(raw_phone: str) -> str:
|
||||
if not raw_phone:
|
||||
raise ValueError("Phone number is required")
|
||||
|
||||
phone = re.sub(r"[\s\-\(\)]", "", raw_phone)
|
||||
if phone.startswith("00"):
|
||||
phone = "+" + phone[2:]
|
||||
|
||||
if phone.startswith("+"):
|
||||
digits = phone[1:]
|
||||
if not digits.isdigit() or not (8 <= len(digits) <= 15):
|
||||
raise ValueError("Invalid phone number format")
|
||||
return "+" + digits
|
||||
|
||||
digits = re.sub(r"\D", "", phone)
|
||||
|
||||
if digits.startswith("0") and len(digits) == 10 and digits[1] == "5":
|
||||
return "+966" + digits[1:]
|
||||
|
||||
if digits.startswith("5") and len(digits) == 9:
|
||||
return "+966" + digits
|
||||
|
||||
raise ValueError("Phone number must be in E.164 format or a valid Saudi mobile")
|
||||
Reference in New Issue
Block a user