Initial commit
This commit is contained in:
@@ -0,0 +1,83 @@
|
||||
import logging
|
||||
import secrets
|
||||
from dataclasses import dataclass
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.hashers import check_password, make_password
|
||||
from django.utils import timezone
|
||||
|
||||
from apps.accounts.models import OtpChannel, PhoneOTP
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class OtpSendResult:
|
||||
request_id: str
|
||||
expires_at: str
|
||||
|
||||
|
||||
class BaseOtpProvider:
|
||||
def send_sms(self, to_number: str, message: str) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def send_whatsapp(self, to_number: str, message: str) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class ConsoleOtpProvider(BaseOtpProvider):
|
||||
def send_sms(self, to_number: str, message: str) -> None:
|
||||
logger.info("OTP SMS to %s: %s", to_number, message)
|
||||
|
||||
def send_whatsapp(self, to_number: str, message: str) -> None:
|
||||
logger.info("OTP WhatsApp to %s: %s", to_number, message)
|
||||
|
||||
|
||||
PROVIDERS = {
|
||||
"console": ConsoleOtpProvider,
|
||||
}
|
||||
|
||||
|
||||
def get_provider() -> BaseOtpProvider:
|
||||
provider_key = settings.OTP_PROVIDER
|
||||
provider_cls = PROVIDERS.get(provider_key)
|
||||
if not provider_cls:
|
||||
raise ValueError(f"Unknown OTP provider: {provider_key}")
|
||||
return provider_cls()
|
||||
|
||||
|
||||
def generate_code(length: int = 6) -> str:
|
||||
digits = "0123456789"
|
||||
return "".join(secrets.choice(digits) for _ in range(length))
|
||||
|
||||
|
||||
def create_and_send_otp(phone_number: str, channel: str) -> OtpSendResult:
|
||||
provider = get_provider()
|
||||
code = generate_code()
|
||||
otp = PhoneOTP.objects.create(
|
||||
phone_number=phone_number,
|
||||
channel=channel,
|
||||
provider=settings.OTP_PROVIDER,
|
||||
code_hash=make_password(code),
|
||||
expires_at=PhoneOTP.expiry_at(),
|
||||
)
|
||||
|
||||
message = f"Your verification code is {code}. It expires in {settings.OTP_EXPIRY_MINUTES} minutes."
|
||||
if channel == OtpChannel.SMS:
|
||||
provider.send_sms(phone_number, message)
|
||||
elif channel == OtpChannel.WHATSAPP:
|
||||
provider.send_whatsapp(phone_number, message)
|
||||
else:
|
||||
raise ValueError("Unsupported OTP channel")
|
||||
|
||||
return OtpSendResult(request_id=str(otp.id), expires_at=otp.expires_at.isoformat())
|
||||
|
||||
|
||||
def verify_otp(otp: PhoneOTP, code: str) -> bool:
|
||||
if otp.verified_at or otp.is_expired():
|
||||
return False
|
||||
if not check_password(code, otp.code_hash):
|
||||
return False
|
||||
otp.verified_at = timezone.now()
|
||||
otp.save(update_fields=["verified_at"])
|
||||
return True
|
||||
Reference in New Issue
Block a user