from datetime import timedelta from unittest.mock import patch import pytest from django.contrib.auth.hashers import make_password from django.test import override_settings from django.utils import timezone from apps.accounts.models import OtpChannel, OtpPurpose, PhoneOTP from apps.accounts.services.otp import OtpCooldownError, OtpRateLimitError, create_and_send_otp, verify_otp @pytest.mark.django_db @override_settings(OTP_PROVIDER="console", OTP_MAX_PER_WINDOW=1, OTP_WINDOW_MINUTES=15, OTP_RESEND_COOLDOWN_SECONDS=0) def test_otp_rate_limit(): create_and_send_otp("+966512345678", OtpChannel.SMS) with pytest.raises(OtpRateLimitError): create_and_send_otp("+966512345678", OtpChannel.SMS) @pytest.mark.django_db @override_settings( OTP_PROVIDER="console", OTP_MAX_PER_WINDOW=5, OTP_WINDOW_MINUTES=15, OTP_RESEND_COOLDOWN_SECONDS=60, ) def test_otp_cooldown_enforced(): create_and_send_otp("+966512345678", OtpChannel.SMS) with pytest.raises(OtpCooldownError): create_and_send_otp("+966512345678", OtpChannel.SMS) @pytest.mark.django_db def test_otp_max_attempts_blocks_verification(): otp = PhoneOTP.objects.create( phone_number="+966512345678", channel=OtpChannel.SMS, purpose=OtpPurpose.AUTH, provider="console", code_hash=make_password("123456"), expires_at=PhoneOTP.expiry_at(), ) # Burn attempts with wrong code until the limit is exceeded. for _ in range(otp.max_attempts): assert verify_otp(otp, "000000") is False otp.refresh_from_db() assert otp.attempt_count == otp.max_attempts # Once the max is reached, even a correct code must remain blocked. assert verify_otp(otp, "123456") is False otp.refresh_from_db() # Do not lock this test to a specific increment policy after lockout. assert otp.attempt_count >= otp.max_attempts assert otp.verified_at is None @pytest.mark.django_db @override_settings(OTP_PROVIDER="console", OTP_RESEND_COOLDOWN_SECONDS=60) def test_otp_cooldown_retry_after_seconds(): fixed_now = timezone.now() with patch("apps.accounts.services.otp.timezone.now", return_value=fixed_now): result = create_and_send_otp("+966512345678", OtpChannel.SMS) # Align created_at with fixed time for deterministic cooldown. PhoneOTP.objects.filter(id=result.request_id).update(created_at=fixed_now) with pytest.raises(OtpCooldownError) as excinfo: create_and_send_otp("+966512345678", OtpChannel.SMS) assert excinfo.value.retry_after_seconds == 60 @pytest.mark.django_db @override_settings(OTP_PROVIDER="console", OTP_MAX_PER_WINDOW=1, OTP_WINDOW_MINUTES=15, OTP_RESEND_COOLDOWN_SECONDS=0) def test_otp_rate_limit_retry_after_seconds(): fixed_now = timezone.now() with patch("apps.accounts.services.otp.timezone.now", return_value=fixed_now): result = create_and_send_otp("+966512345678", OtpChannel.SMS) # Make the oldest OTP sit 10s before window expiry. window_start = fixed_now - timedelta(minutes=15) PhoneOTP.objects.filter(id=result.request_id).update(created_at=window_start + timedelta(seconds=10)) with pytest.raises(OtpRateLimitError) as excinfo: create_and_send_otp("+966512345678", OtpChannel.SMS) assert excinfo.value.retry_after_seconds == 10 @pytest.mark.django_db @override_settings(OTP_PROVIDER="console", OTP_RESEND_COOLDOWN_SECONDS=1) def test_otp_resend_after_cooldown_ok(): result = create_and_send_otp("+966512345678", OtpChannel.SMS) # Force cooldown to be elapsed. PhoneOTP.objects.filter(id=result.request_id).update( created_at=timezone.now() - timedelta(seconds=5) ) create_and_send_otp("+966512345678", OtpChannel.SMS) assert PhoneOTP.objects.filter(phone_number="+966512345678").count() == 2 @pytest.mark.django_db def test_verify_otp_rejects_expired(): otp = PhoneOTP.objects.create( phone_number="+966512345678", channel=OtpChannel.SMS, purpose=OtpPurpose.AUTH, provider="console", code_hash=make_password("123456"), expires_at=timezone.now() - timedelta(minutes=1), ) assert verify_otp(otp, "123456") is False otp.refresh_from_db() assert otp.attempt_count == 0 assert otp.verified_at is None @pytest.mark.django_db def test_verify_otp_rejects_reuse_after_verified(): otp = PhoneOTP.objects.create( phone_number="+966512345678", channel=OtpChannel.SMS, purpose=OtpPurpose.AUTH, provider="console", code_hash=make_password("123456"), expires_at=PhoneOTP.expiry_at(), ) assert verify_otp(otp, "123456") is True otp.refresh_from_db() assert otp.attempt_count == 1 assert verify_otp(otp, "123456") is False otp.refresh_from_db() assert otp.attempt_count == 1