feat: IP & device rate limits

This commit is contained in:
2026-03-14 01:07:26 +03:00
parent 9b87eb74d7
commit ad711d1daf
7 changed files with 212 additions and 7 deletions
@@ -0,0 +1,23 @@
# Generated by Django 6.0.3 on 2026-03-13 21:54
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('accounts', '0004_alter_user_groups_alter_user_phone_number_and_more'),
]
operations = [
migrations.AddField(
model_name='phoneotp',
name='device_signal',
field=models.CharField(blank=True, db_index=True, default='', max_length=64),
),
migrations.AddField(
model_name='phoneotp',
name='request_ip',
field=models.GenericIPAddressField(blank=True, db_index=True, null=True),
),
]
+3
View File
@@ -97,6 +97,9 @@ class PhoneOTP(models.Model):
verified_at = models.DateTimeField(null=True, blank=True) verified_at = models.DateTimeField(null=True, blank=True)
attempt_count = models.PositiveSmallIntegerField(default=0) attempt_count = models.PositiveSmallIntegerField(default=0)
max_attempts = models.PositiveSmallIntegerField(default=5) max_attempts = models.PositiveSmallIntegerField(default=5)
# Request metadata for abuse controls and support investigations.
request_ip = models.GenericIPAddressField(null=True, blank=True, db_index=True)
device_signal = models.CharField(max_length=64, blank=True, default="", db_index=True)
def is_expired(self): def is_expired(self):
return timezone.now() >= self.expires_at return timezone.now() >= self.expires_at
+1
View File
@@ -62,6 +62,7 @@ class OTPVerifySerializer(serializers.Serializer):
class PhoneAuthRequestSerializer(serializers.Serializer): class PhoneAuthRequestSerializer(serializers.Serializer):
phone_number = serializers.CharField(max_length=20) phone_number = serializers.CharField(max_length=20)
channel = serializers.ChoiceField(choices=OtpChannel.choices) channel = serializers.ChoiceField(choices=OtpChannel.choices)
device_id = serializers.CharField(required=False, allow_blank=True, max_length=128, write_only=True)
email = serializers.EmailField(required=False, allow_null=True, allow_blank=True) email = serializers.EmailField(required=False, allow_null=True, allow_blank=True)
first_name = serializers.CharField(required=False, allow_blank=True) first_name = serializers.CharField(required=False, allow_blank=True)
last_name = serializers.CharField(required=False, allow_blank=True) last_name = serializers.CharField(required=False, allow_blank=True)
+86 -6
View File
@@ -1,6 +1,8 @@
import logging import logging
import os import os
import secrets import secrets
from hashlib import sha256
import ipaddress
from dataclasses import dataclass from dataclasses import dataclass
from datetime import timedelta from datetime import timedelta
@@ -33,6 +35,18 @@ class OtpCooldownError(RuntimeError):
self.retry_after_seconds = retry_after_seconds self.retry_after_seconds = retry_after_seconds
class OtpIpRateLimitError(RuntimeError):
def __init__(self, retry_after_seconds: int):
super().__init__(_("Too many OTP requests from this IP. Try again later."))
self.retry_after_seconds = retry_after_seconds
class OtpDeviceRateLimitError(RuntimeError):
def __init__(self, retry_after_seconds: int):
super().__init__(_("Too many OTP requests from this device. Try again later."))
self.retry_after_seconds = retry_after_seconds
class BaseOtpProvider: class BaseOtpProvider:
uses_provider_otp = False uses_provider_otp = False
@@ -166,7 +180,73 @@ def generate_code(length: int = 6) -> str:
return "".join(secrets.choice(digits) for _ in range(length)) return "".join(secrets.choice(digits) for _ in range(length))
def create_and_send_otp(phone_number: str, channel: str, purpose: str = OtpPurpose.AUTH) -> OtpSendResult: def _compute_retry_after_seconds(oldest_recent, now, window_minutes: int, floor_seconds: int = 1) -> int:
if oldest_recent:
retry_after = int((oldest_recent.created_at + timedelta(minutes=window_minutes) - now).total_seconds())
else:
retry_after = floor_seconds
return max(retry_after, floor_seconds)
def normalize_request_ip(raw_ip: str | None) -> str | None:
if not raw_ip:
return None
candidate = raw_ip.split(",")[0].strip()
try:
return str(ipaddress.ip_address(candidate))
except ValueError:
return None
def build_device_signal(device_id: str | None, user_agent: str | None, accept_language: str | None) -> str:
# Prefer explicit device id; fallback to passive headers for coarse signal.
source = (device_id or "").strip()
if not source:
source = f"{(user_agent or '').strip()}|{(accept_language or '').strip()}"
if not source.strip("|"):
return ""
return f"sha256:{sha256(source.encode('utf-8')).hexdigest()[:24]}"
def enforce_phone_auth_request_limits(request_ip: str | None, device_signal: str | None) -> None:
now = timezone.now()
window_minutes = getattr(settings, "PHONE_AUTH_RISK_WINDOW_MINUTES", 15)
window_start = now - timedelta(minutes=window_minutes)
ip_limit = getattr(settings, "PHONE_AUTH_IP_MAX_PER_WINDOW", 20)
device_limit = getattr(settings, "PHONE_AUTH_DEVICE_MAX_PER_WINDOW", 20)
if request_ip and ip_limit > 0:
ip_recent = PhoneOTP.objects.filter(
purpose=OtpPurpose.AUTH,
request_ip=request_ip,
created_at__gte=window_start,
)
if ip_recent.count() >= ip_limit:
oldest_recent = ip_recent.order_by("created_at").first()
raise OtpIpRateLimitError(
retry_after_seconds=_compute_retry_after_seconds(oldest_recent, now, window_minutes)
)
if device_signal and device_limit > 0:
device_recent = PhoneOTP.objects.filter(
purpose=OtpPurpose.AUTH,
device_signal=device_signal,
created_at__gte=window_start,
)
if device_recent.count() >= device_limit:
oldest_recent = device_recent.order_by("created_at").first()
raise OtpDeviceRateLimitError(
retry_after_seconds=_compute_retry_after_seconds(oldest_recent, now, window_minutes)
)
def create_and_send_otp(
phone_number: str,
channel: str,
purpose: str = OtpPurpose.AUTH,
request_ip: str | None = None,
device_signal: str = "",
) -> OtpSendResult:
provider = get_provider() provider = get_provider()
now = timezone.now() now = timezone.now()
window_minutes = getattr(settings, "OTP_WINDOW_MINUTES", 15) window_minutes = getattr(settings, "OTP_WINDOW_MINUTES", 15)
@@ -177,11 +257,9 @@ def create_and_send_otp(phone_number: str, channel: str, purpose: str = OtpPurpo
recent_qs = PhoneOTP.objects.filter(phone_number=phone_number, created_at__gte=window_start) recent_qs = PhoneOTP.objects.filter(phone_number=phone_number, created_at__gte=window_start)
if recent_qs.count() >= max_per_window: if recent_qs.count() >= max_per_window:
oldest_recent = recent_qs.order_by("created_at").first() oldest_recent = recent_qs.order_by("created_at").first()
if oldest_recent: raise OtpRateLimitError(
retry_after = int((oldest_recent.created_at + timedelta(minutes=window_minutes) - now).total_seconds()) retry_after_seconds=_compute_retry_after_seconds(oldest_recent, now, window_minutes, cooldown_seconds)
else: )
retry_after = cooldown_seconds
raise OtpRateLimitError(retry_after_seconds=max(retry_after, cooldown_seconds))
latest = ( latest = (
PhoneOTP.objects.filter(phone_number=phone_number, channel=channel) PhoneOTP.objects.filter(phone_number=phone_number, channel=channel)
@@ -210,6 +288,8 @@ def create_and_send_otp(phone_number: str, channel: str, purpose: str = OtpPurpo
provider=settings.OTP_PROVIDER, provider=settings.OTP_PROVIDER,
code_hash=code_hash, code_hash=code_hash,
expires_at=PhoneOTP.expiry_at(), expires_at=PhoneOTP.expiry_at(),
request_ip=request_ip,
device_signal=device_signal,
) )
if provider.uses_provider_otp: if provider.uses_provider_otp:
@@ -167,3 +167,68 @@ def test_phone_auth_request_rate_limit_returns_retry_after(client):
data = second.json() data = second.json()
assert "detail" in data assert "detail" in data
assert data["retry_after_seconds"] > 0 assert data["retry_after_seconds"] > 0
@pytest.mark.django_db
@override_settings(
OTP_PROVIDER="console",
OTP_MAX_PER_WINDOW=20,
OTP_WINDOW_MINUTES=15,
OTP_RESEND_COOLDOWN_SECONDS=0,
PHONE_AUTH_RISK_WINDOW_MINUTES=15,
PHONE_AUTH_IP_MAX_PER_WINDOW=2,
PHONE_AUTH_DEVICE_MAX_PER_WINDOW=20,
)
def test_phone_auth_request_ip_throttle_returns_retry_after(client):
with patch("apps.accounts.services.otp.generate_code", return_value="123456"):
first = client.post(
reverse("phone_auth_request"),
{"phone_number": "0512345678", "channel": "sms"},
content_type="application/json",
REMOTE_ADDR="203.0.113.10",
)
assert first.status_code == 201
second = client.post(
reverse("phone_auth_request"),
{"phone_number": "0512345679", "channel": "sms"},
content_type="application/json",
REMOTE_ADDR="203.0.113.10",
)
assert second.status_code == 201
third = client.post(
reverse("phone_auth_request"),
{"phone_number": "0512345680", "channel": "sms"},
content_type="application/json",
REMOTE_ADDR="203.0.113.10",
)
assert third.status_code == 429
data = third.json()
assert "detail" in data
assert data["retry_after_seconds"] > 0
@pytest.mark.django_db
@override_settings(
OTP_PROVIDER="console",
OTP_RESEND_COOLDOWN_SECONDS=0,
OTP_MAX_PER_WINDOW=20,
PHONE_AUTH_IP_MAX_PER_WINDOW=20,
PHONE_AUTH_DEVICE_MAX_PER_WINDOW=20,
)
def test_phone_auth_request_persists_request_ip_and_device_signal(client):
with patch("apps.accounts.services.otp.generate_code", return_value="123456"):
response = client.post(
reverse("phone_auth_request"),
{"phone_number": "0512345678", "channel": "sms", "device_id": "device-abc"},
content_type="application/json",
REMOTE_ADDR="198.51.100.40",
HTTP_USER_AGENT="pytest-agent",
)
assert response.status_code == 201
otp = PhoneOTP.objects.get(id=response.json()["request_id"])
assert otp.request_ip == "198.51.100.40"
assert otp.device_signal
+31 -1
View File
@@ -16,8 +16,13 @@ from apps.accounts.serializers import (
) )
from apps.accounts.services.otp import ( from apps.accounts.services.otp import (
OtpCooldownError, OtpCooldownError,
OtpDeviceRateLimitError,
OtpIpRateLimitError,
OtpRateLimitError, OtpRateLimitError,
build_device_signal,
create_and_send_otp, create_and_send_otp,
enforce_phone_auth_request_limits,
normalize_request_ip,
verify_otp, verify_otp,
) )
@@ -93,6 +98,25 @@ class PhoneAuthRequestView(APIView):
data = serializer.validated_data data = serializer.validated_data
phone_number = data["phone_number"] phone_number = data["phone_number"]
email = data.get("email") or None email = data.get("email") or None
request_ip = normalize_request_ip(request.META.get("HTTP_X_FORWARDED_FOR") or request.META.get("REMOTE_ADDR"))
device_signal = build_device_signal(
data.get("device_id"),
request.META.get("HTTP_USER_AGENT"),
request.META.get("HTTP_ACCEPT_LANGUAGE"),
)
try:
enforce_phone_auth_request_limits(request_ip=request_ip, device_signal=device_signal)
except OtpIpRateLimitError as exc:
return Response(
{"detail": str(exc), "retry_after_seconds": exc.retry_after_seconds},
status=status.HTTP_429_TOO_MANY_REQUESTS,
)
except OtpDeviceRateLimitError as exc:
return Response(
{"detail": str(exc), "retry_after_seconds": exc.retry_after_seconds},
status=status.HTTP_429_TOO_MANY_REQUESTS,
)
user = User.objects.filter(phone_number=phone_number).first() user = User.objects.filter(phone_number=phone_number).first()
if not user: if not user:
@@ -110,7 +134,13 @@ class PhoneAuthRequestView(APIView):
) )
try: try:
result = create_and_send_otp(phone_number, data["channel"], purpose=OtpPurpose.AUTH) result = create_and_send_otp(
phone_number,
data["channel"],
purpose=OtpPurpose.AUTH,
request_ip=request_ip,
device_signal=device_signal,
)
except OtpCooldownError as exc: except OtpCooldownError as exc:
return Response( return Response(
{"detail": str(exc), "retry_after_seconds": exc.retry_after_seconds}, {"detail": str(exc), "retry_after_seconds": exc.retry_after_seconds},
+3
View File
@@ -146,6 +146,9 @@ OTP_EXPIRY_MINUTES = int(os.getenv("OTP_EXPIRY_MINUTES", "5"))
OTP_MAX_PER_WINDOW = int(os.getenv("OTP_MAX_PER_WINDOW", "5")) OTP_MAX_PER_WINDOW = int(os.getenv("OTP_MAX_PER_WINDOW", "5"))
OTP_WINDOW_MINUTES = int(os.getenv("OTP_WINDOW_MINUTES", "15")) OTP_WINDOW_MINUTES = int(os.getenv("OTP_WINDOW_MINUTES", "15"))
OTP_RESEND_COOLDOWN_SECONDS = int(os.getenv("OTP_RESEND_COOLDOWN_SECONDS", "60")) OTP_RESEND_COOLDOWN_SECONDS = int(os.getenv("OTP_RESEND_COOLDOWN_SECONDS", "60"))
PHONE_AUTH_RISK_WINDOW_MINUTES = int(os.getenv("PHONE_AUTH_RISK_WINDOW_MINUTES", "15"))
PHONE_AUTH_IP_MAX_PER_WINDOW = int(os.getenv("PHONE_AUTH_IP_MAX_PER_WINDOW", "20"))
PHONE_AUTH_DEVICE_MAX_PER_WINDOW = int(os.getenv("PHONE_AUTH_DEVICE_MAX_PER_WINDOW", "20"))
DEFAULT_CURRENCY = os.getenv("DEFAULT_CURRENCY", "SAR") DEFAULT_CURRENCY = os.getenv("DEFAULT_CURRENCY", "SAR")
NOTIFICATION_PROVIDER = os.getenv("NOTIFICATION_PROVIDER", OTP_PROVIDER) NOTIFICATION_PROVIDER = os.getenv("NOTIFICATION_PROVIDER", OTP_PROVIDER)
NOTIFICATION_DEFAULT_CHANNEL = os.getenv("NOTIFICATION_DEFAULT_CHANNEL", "sms") NOTIFICATION_DEFAULT_CHANNEL = os.getenv("NOTIFICATION_DEFAULT_CHANNEL", "sms")