127 lines
4.6 KiB
Python
127 lines
4.6 KiB
Python
from datetime import timedelta
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from django.contrib.auth.hashers import make_password
|
|
from django.test import override_settings
|
|
from django.utils import timezone
|
|
|
|
from apps.accounts.models import OtpChannel, OtpPurpose, PhoneOTP
|
|
from apps.accounts.services.otp import OtpCooldownError, OtpRateLimitError, create_and_send_otp, verify_otp
|
|
|
|
|
|
@pytest.mark.django_db
|
|
@override_settings(OTP_PROVIDER="console", OTP_MAX_PER_WINDOW=1, OTP_WINDOW_MINUTES=15, OTP_RESEND_COOLDOWN_SECONDS=0)
|
|
def test_otp_rate_limit():
|
|
create_and_send_otp("+966512345678", OtpChannel.SMS)
|
|
with pytest.raises(OtpRateLimitError):
|
|
create_and_send_otp("+966512345678", OtpChannel.SMS)
|
|
|
|
|
|
@pytest.mark.django_db
|
|
@override_settings(
|
|
OTP_PROVIDER="console",
|
|
OTP_MAX_PER_WINDOW=5,
|
|
OTP_WINDOW_MINUTES=15,
|
|
OTP_RESEND_COOLDOWN_SECONDS=60,
|
|
)
|
|
def test_otp_cooldown_enforced():
|
|
create_and_send_otp("+966512345678", OtpChannel.SMS)
|
|
with pytest.raises(OtpCooldownError):
|
|
create_and_send_otp("+966512345678", OtpChannel.SMS)
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_otp_max_attempts_blocks_verification():
|
|
otp = PhoneOTP.objects.create(
|
|
phone_number="+966512345678",
|
|
channel=OtpChannel.SMS,
|
|
purpose=OtpPurpose.AUTH,
|
|
provider="console",
|
|
code_hash=make_password("123456"),
|
|
expires_at=PhoneOTP.expiry_at(),
|
|
)
|
|
# Burn attempts with wrong code until the limit is exceeded.
|
|
for _ in range(otp.max_attempts):
|
|
assert verify_otp(otp, "000000") is False
|
|
otp.refresh_from_db()
|
|
assert otp.attempt_count == otp.max_attempts
|
|
|
|
assert verify_otp(otp, "123456") is False
|
|
otp.refresh_from_db()
|
|
assert otp.attempt_count == otp.max_attempts + 1
|
|
assert otp.verified_at is None
|
|
|
|
|
|
@pytest.mark.django_db
|
|
@override_settings(OTP_PROVIDER="console", OTP_RESEND_COOLDOWN_SECONDS=60)
|
|
def test_otp_cooldown_retry_after_seconds():
|
|
fixed_now = timezone.now()
|
|
with patch("apps.accounts.services.otp.timezone.now", return_value=fixed_now):
|
|
result = create_and_send_otp("+966512345678", OtpChannel.SMS)
|
|
# Align created_at with fixed time for deterministic cooldown.
|
|
PhoneOTP.objects.filter(id=result.request_id).update(created_at=fixed_now)
|
|
with pytest.raises(OtpCooldownError) as excinfo:
|
|
create_and_send_otp("+966512345678", OtpChannel.SMS)
|
|
assert excinfo.value.retry_after_seconds == 60
|
|
|
|
|
|
@pytest.mark.django_db
|
|
@override_settings(OTP_PROVIDER="console", OTP_MAX_PER_WINDOW=1, OTP_WINDOW_MINUTES=15, OTP_RESEND_COOLDOWN_SECONDS=0)
|
|
def test_otp_rate_limit_retry_after_seconds():
|
|
fixed_now = timezone.now()
|
|
with patch("apps.accounts.services.otp.timezone.now", return_value=fixed_now):
|
|
result = create_and_send_otp("+966512345678", OtpChannel.SMS)
|
|
# Make the oldest OTP sit 10s before window expiry.
|
|
window_start = fixed_now - timedelta(minutes=15)
|
|
PhoneOTP.objects.filter(id=result.request_id).update(created_at=window_start + timedelta(seconds=10))
|
|
with pytest.raises(OtpRateLimitError) as excinfo:
|
|
create_and_send_otp("+966512345678", OtpChannel.SMS)
|
|
assert excinfo.value.retry_after_seconds == 10
|
|
|
|
|
|
@pytest.mark.django_db
|
|
@override_settings(OTP_PROVIDER="console", OTP_RESEND_COOLDOWN_SECONDS=1)
|
|
def test_otp_resend_after_cooldown_ok():
|
|
result = create_and_send_otp("+966512345678", OtpChannel.SMS)
|
|
# Force cooldown to be elapsed.
|
|
PhoneOTP.objects.filter(id=result.request_id).update(
|
|
created_at=timezone.now() - timedelta(seconds=5)
|
|
)
|
|
create_and_send_otp("+966512345678", OtpChannel.SMS)
|
|
assert PhoneOTP.objects.filter(phone_number="+966512345678").count() == 2
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_verify_otp_rejects_expired():
|
|
otp = PhoneOTP.objects.create(
|
|
phone_number="+966512345678",
|
|
channel=OtpChannel.SMS,
|
|
purpose=OtpPurpose.AUTH,
|
|
provider="console",
|
|
code_hash=make_password("123456"),
|
|
expires_at=timezone.now() - timedelta(minutes=1),
|
|
)
|
|
assert verify_otp(otp, "123456") is False
|
|
otp.refresh_from_db()
|
|
assert otp.attempt_count == 0
|
|
assert otp.verified_at is None
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_verify_otp_rejects_reuse_after_verified():
|
|
otp = PhoneOTP.objects.create(
|
|
phone_number="+966512345678",
|
|
channel=OtpChannel.SMS,
|
|
purpose=OtpPurpose.AUTH,
|
|
provider="console",
|
|
code_hash=make_password("123456"),
|
|
expires_at=PhoneOTP.expiry_at(),
|
|
)
|
|
assert verify_otp(otp, "123456") is True
|
|
otp.refresh_from_db()
|
|
assert otp.attempt_count == 1
|
|
assert verify_otp(otp, "123456") is False
|
|
otp.refresh_from_db()
|
|
assert otp.attempt_count == 1
|