Files
Salon/backend/apps/accounts/views.py
T

228 lines
8.4 KiB
Python

from django.contrib.auth import get_user_model
from django.db import IntegrityError
from rest_framework import generics, permissions, status
from rest_framework.response import Response
from django.utils.translation import gettext as _
from rest_framework.views import APIView
from rest_framework_simplejwt.tokens import RefreshToken
from apps.accounts.models import OtpPurpose, PhoneOTP
from apps.accounts.serializers import (
OTPRequestSerializer,
OTPVerifySerializer,
PhoneAuthRequestSerializer,
PhoneAuthVerifySerializer,
RegisterSerializer,
UserSerializer,
)
from apps.accounts.services.otp import (
OtpCooldownError,
OtpDeviceRateLimitError,
OtpIpRateLimitError,
OtpRateLimitError,
build_device_signal,
create_and_send_otp,
enforce_phone_auth_request_limits,
normalize_request_ip,
verify_otp,
)
User = get_user_model()
class RegisterView(generics.CreateAPIView):
serializer_class = RegisterSerializer
permission_classes = [permissions.AllowAny]
class MeView(generics.RetrieveUpdateAPIView):
serializer_class = UserSerializer
permission_classes = [permissions.IsAuthenticated]
def get_object(self):
return self.request.user
class OTPRequestView(APIView):
permission_classes = [permissions.AllowAny]
def post(self, request):
serializer = OTPRequestSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
try:
result = create_and_send_otp(data["phone_number"], data["channel"], purpose=OtpPurpose.VERIFY)
except OtpCooldownError as exc:
return Response(
{"detail": str(exc), "retry_after_seconds": exc.retry_after_seconds},
status=status.HTTP_429_TOO_MANY_REQUESTS,
)
except OtpRateLimitError as exc:
return Response(
{"detail": str(exc), "retry_after_seconds": exc.retry_after_seconds},
status=status.HTTP_429_TOO_MANY_REQUESTS,
)
return Response(
{"request_id": result.request_id, "expires_at": result.expires_at},
status=status.HTTP_201_CREATED,
)
class OTPVerifyView(APIView):
permission_classes = [permissions.AllowAny]
def post(self, request):
serializer = OTPVerifySerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
# Purpose isolation: verification endpoint accepts only verify-purpose OTPs.
otp = PhoneOTP.objects.filter(id=data["request_id"], purpose=OtpPurpose.VERIFY).first()
if not otp:
return Response({"detail": _("Invalid or expired code")}, status=status.HTTP_400_BAD_REQUEST)
if not verify_otp(otp, data["code"]):
return Response({"detail": _("Invalid or expired code")}, status=status.HTTP_400_BAD_REQUEST)
user = User.objects.filter(phone_number=otp.phone_number).first()
if user and not user.is_phone_verified:
user.is_phone_verified = True
user.save(update_fields=["is_phone_verified"])
return Response({"detail": _("Phone verified")}, status=status.HTTP_200_OK)
class PhoneAuthRequestView(APIView):
permission_classes = [permissions.AllowAny]
def post(self, request):
serializer = PhoneAuthRequestSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
phone_number = data["phone_number"]
email = data.get("email") or None
request_ip = normalize_request_ip(request.META.get("HTTP_X_FORWARDED_FOR") or request.META.get("REMOTE_ADDR"))
device_signal = build_device_signal(
data.get("device_id"),
request.META.get("HTTP_USER_AGENT"),
request.META.get("HTTP_ACCEPT_LANGUAGE"),
)
try:
enforce_phone_auth_request_limits(request_ip=request_ip, device_signal=device_signal)
except OtpIpRateLimitError as exc:
return Response(
{"detail": str(exc), "retry_after_seconds": exc.retry_after_seconds},
status=status.HTTP_429_TOO_MANY_REQUESTS,
)
except OtpDeviceRateLimitError as exc:
return Response(
{"detail": str(exc), "retry_after_seconds": exc.retry_after_seconds},
status=status.HTTP_429_TOO_MANY_REQUESTS,
)
user = User.objects.filter(phone_number=phone_number).first()
if not user:
if email and User.objects.filter(email=email).exists():
return Response(
{"detail": _("Email already in use.")},
status=status.HTTP_400_BAD_REQUEST,
)
try:
user = User.objects.create_user(
email=email,
phone_number=phone_number,
first_name=data.get("first_name", ""),
last_name=data.get("last_name", ""),
role="customer",
)
except IntegrityError:
user = User.objects.filter(phone_number=phone_number).first()
if not user:
# Another worker may have claimed this phone or email after our initial checks.
if email and User.objects.filter(email=email).exists():
return Response(
{"detail": _("Email already in use.")},
status=status.HTTP_400_BAD_REQUEST,
)
raise
try:
result = create_and_send_otp(
phone_number,
data["channel"],
purpose=OtpPurpose.AUTH,
request_ip=request_ip,
device_signal=device_signal,
)
except OtpCooldownError as exc:
return Response(
{"detail": str(exc), "retry_after_seconds": exc.retry_after_seconds},
status=status.HTTP_429_TOO_MANY_REQUESTS,
)
except OtpRateLimitError as exc:
return Response(
{"detail": str(exc), "retry_after_seconds": exc.retry_after_seconds},
status=status.HTTP_429_TOO_MANY_REQUESTS,
)
return Response(
{"request_id": result.request_id, "expires_at": result.expires_at},
status=status.HTTP_201_CREATED,
)
class PhoneAuthVerifyView(APIView):
permission_classes = [permissions.AllowAny]
def post(self, request):
serializer = PhoneAuthVerifySerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
# Purpose isolation: login endpoint accepts only auth-purpose OTPs.
otp = PhoneOTP.objects.filter(id=data["request_id"], purpose=OtpPurpose.AUTH).first()
if not otp:
return Response({"detail": _("Invalid or expired code")}, status=status.HTTP_400_BAD_REQUEST)
if not verify_otp(otp, data["code"]):
return Response({"detail": _("Invalid or expired code")}, status=status.HTTP_400_BAD_REQUEST)
user = User.objects.filter(phone_number=otp.phone_number).first()
if not user:
return Response({"detail": _("User not found")}, status=status.HTTP_404_NOT_FOUND)
if not user.is_phone_verified:
user.is_phone_verified = True
user.save(update_fields=["is_phone_verified"])
refresh = RefreshToken.for_user(user)
return Response(
{
"refresh": str(refresh),
"access": str(refresh.access_token),
"user": UserSerializer(user).data,
},
status=status.HTTP_200_OK,
)
class SocialLoginPlaceholderView(APIView):
permission_classes = [permissions.AllowAny]
def post(self, request, provider):
return Response(
{"detail": _("Social login not configured yet. Add OAuth provider config.")},
status=status.HTTP_501_NOT_IMPLEMENTED,
)
class PasswordTokenObtainDeprecatedView(APIView):
permission_classes = [permissions.AllowAny]
def post(self, request):
return Response(
{
"detail": _(
"Password login is deprecated. Use /api/auth/phone/request/ then /api/auth/phone/verify/."
)
},
status=status.HTTP_410_GONE,
)