Added Authentica OTP

This commit is contained in:
2026-02-28 16:58:50 +03:00
parent a1da918f95
commit 4253f6f650
4 changed files with 230 additions and 20 deletions
+123 -16
View File
@@ -1,8 +1,8 @@
import logging
import os
import secrets
from datetime import timedelta
from dataclasses import dataclass
from datetime import timedelta
from django.conf import settings
from django.contrib.auth.hashers import check_password, make_password
@@ -34,12 +34,20 @@ class OtpCooldownError(RuntimeError):
class BaseOtpProvider:
uses_provider_otp = False
def send_sms(self, to_number: str, message: str) -> None:
raise NotImplementedError
def send_whatsapp(self, to_number: str, message: str) -> None:
raise NotImplementedError
def send_otp(self, to_number: str, channel: str) -> None:
raise NotImplementedError
def verify_otp(self, to_number: str, code: str) -> bool:
raise NotImplementedError
class ConsoleOtpProvider(BaseOtpProvider):
def send_sms(self, to_number: str, message: str) -> None:
@@ -102,21 +110,99 @@ class UnifonicOtpProvider(BaseOtpProvider):
raise NotImplementedError(_("Unifonic WhatsApp adapter not implemented yet"))
class AuthenticaOtpProvider(BaseOtpProvider):
"""Authentica provider for OTP delivery (SMS/WhatsApp) plus custom SMS messaging."""
uses_provider_otp = True
def __init__(self) -> None:
self.api_key = os.getenv("AUTHENTICA_API_KEY")
self.base_url = os.getenv("AUTHENTICA_BASE_URL", "https://api.authentica.sa")
self.timeout_seconds = float(os.getenv("AUTHENTICA_TIMEOUT_SECONDS", "10"))
self.sender_name = os.getenv("AUTHENTICA_SENDER_NAME")
def _assert_config(self) -> None:
if not self.api_key:
raise ValueError(_("Authentica API key is not configured"))
def _headers(self) -> dict:
return {
"Accept": "application/json",
"Content-Type": "application/json",
"X-Authorization": self.api_key,
}
def _post(self, path: str, payload: dict) -> dict:
import requests
self._assert_config()
base_url = self.base_url.rstrip("/")
url = f"{base_url}{path}"
try:
response = requests.post(
url,
headers=self._headers(),
json=payload,
timeout=self.timeout_seconds,
)
except requests.RequestException as exc:
raise RuntimeError(_("Authentica request failed")) from exc
try:
data = response.json()
except ValueError:
data = {"detail": response.text}
if not response.ok:
raise RuntimeError(_("Authentica request failed"))
return data
def send_otp(self, to_number: str, channel: str) -> None:
if channel not in (OtpChannel.SMS, OtpChannel.WHATSAPP):
raise ValueError(_("Unsupported OTP channel"))
method = "sms" if channel == OtpChannel.SMS else "whatsapp"
self._post("/api/v2/send-otp", {"method": method, "phone": to_number})
def verify_otp(self, to_number: str, code: str) -> bool:
data = self._post("/api/v2/verify-otp", {"phone": to_number, "otp": code})
return bool(data.get("verified"))
def send_sms(self, to_number: str, message: str) -> None:
if not self.sender_name:
raise ValueError(_("Authentica sender name is not configured"))
self._post(
"/api/v2/send-sms",
{
"phone": to_number,
"message": message,
"sender_name": self.sender_name,
},
)
def send_whatsapp(self, to_number: str, message: str) -> None:
raise ValueError(_("Authentica WhatsApp messaging is not supported"))
PROVIDERS = {
"console": ConsoleOtpProvider,
"twilio": TwilioOtpProvider,
"unifonic": UnifonicOtpProvider,
"authentica": AuthenticaOtpProvider,
}
def get_provider() -> BaseOtpProvider:
provider_key = settings.OTP_PROVIDER
def _get_provider_for_key(provider_key: str) -> BaseOtpProvider:
provider_cls = PROVIDERS.get(provider_key)
if not provider_cls:
raise ValueError(_("Unknown OTP provider: %(provider)s") % {"provider": provider_key})
return provider_cls()
def get_provider() -> BaseOtpProvider:
return _get_provider_for_key(settings.OTP_PROVIDER)
def generate_code(length: int = 6) -> str:
digits = "0123456789"
return "".join(secrets.choice(digits) for _ in range(length))
@@ -149,25 +235,34 @@ def create_and_send_otp(phone_number: str, channel: str, purpose: str = OtpPurpo
if elapsed < cooldown_seconds:
raise OtpCooldownError(retry_after_seconds=int(cooldown_seconds - elapsed))
code = generate_code()
if provider.uses_provider_otp:
code_hash = make_password(secrets.token_urlsafe(16))
message = None
else:
code = generate_code()
code_hash = make_password(code)
message = _(
"Your verification code is %(code)s. It expires in %(minutes)s minutes."
) % {"code": code, "minutes": settings.OTP_EXPIRY_MINUTES}
otp = PhoneOTP.objects.create(
phone_number=phone_number,
channel=channel,
purpose=purpose,
provider=settings.OTP_PROVIDER,
code_hash=make_password(code),
code_hash=code_hash,
expires_at=PhoneOTP.expiry_at(),
)
message = _(
"Your verification code is %(code)s. It expires in %(minutes)s minutes."
) % {"code": code, "minutes": settings.OTP_EXPIRY_MINUTES}
if channel == OtpChannel.SMS:
provider.send_sms(phone_number, message)
elif channel == OtpChannel.WHATSAPP:
provider.send_whatsapp(phone_number, message)
if provider.uses_provider_otp:
provider.send_otp(phone_number, channel)
else:
raise ValueError(_("Unsupported OTP channel"))
if channel == OtpChannel.SMS:
provider.send_sms(phone_number, message)
elif channel == OtpChannel.WHATSAPP:
provider.send_whatsapp(phone_number, message)
else:
raise ValueError(_("Unsupported OTP channel"))
return OtpSendResult(request_id=str(otp.id), expires_at=otp.expires_at.isoformat())
@@ -179,9 +274,21 @@ def verify_otp(otp: PhoneOTP, code: str) -> bool:
if otp.attempt_count > otp.max_attempts:
otp.save(update_fields=["attempt_count"])
return False
if not check_password(code, otp.code_hash):
otp.save(update_fields=["attempt_count"])
return False
provider_cls = PROVIDERS.get(otp.provider)
if provider_cls and getattr(provider_cls, "uses_provider_otp", False):
provider = provider_cls()
try:
verified = provider.verify_otp(otp.phone_number, code)
except Exception:
otp.save(update_fields=["attempt_count"])
raise
if not verified:
otp.save(update_fields=["attempt_count"])
return False
else:
if not check_password(code, otp.code_hash):
otp.save(update_fields=["attempt_count"])
return False
otp.verified_at = timezone.now()
otp.save(update_fields=["verified_at", "attempt_count"])
return True