[WEB-5430] feat: allow users to change email (#8120)

* feat: change user email

* chore: optimised the logic

* feat: add email change functionality and related modals in profile form

* refactor: format checkEmail method for improved readability

* chore: added rate limit exceeded validation

* feat: implement change email modal with localization support

- Added translation support for the change email modal, including titles, descriptions, and error messages.
- Integrated the useTranslation hook for dynamic text rendering.
- Updated form validation messages to utilize localized strings.
- Enhanced user feedback with localized success and error toast messages.
- Updated button labels and placeholders to reflect localization changes.

* chore: added extra validation in cache key

* fix: format files

---------

Co-authored-by: b-saikrishnakanth <bsaikrishnakanth97@gmail.com>
Co-authored-by: sriramveeraghanta <veeraghanta.sriram@gmail.com>
This commit is contained in:
Bavisetti Narayan 2025-11-24 21:21:52 +05:30 committed by GitHub
parent d6fce114d6
commit ce6299937f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
27 changed files with 2457 additions and 5 deletions

View file

@ -30,6 +30,16 @@ urlpatterns = [
UserEndpoint.as_view({"get": "retrieve_user_settings"}),
name="users",
),
path(
"users/me/email/generate-code/",
UserEndpoint.as_view({"post": "generate_email_verification_code"}),
name="user-email-verify-code",
),
path(
"users/me/email/",
UserEndpoint.as_view({"patch": "update_email"}),
name="user-email-update",
),
# Profile
path("users/me/profile/", ProfileEndpoint.as_view(), name="accounts"),
# End profile

View file

@ -1,10 +1,20 @@
# Python imports
import uuid
import json
import logging
import random
import string
import secrets
# Django imports
from django.db.models import Case, Count, IntegerField, Q, When
from django.contrib.auth import logout
from django.utils import timezone
from django.utils.decorators import method_decorator
from django.views.decorators.cache import cache_control
from django.views.decorators.vary import vary_on_cookie
from django.core.validators import validate_email
from django.core.cache import cache
# Third party imports
from rest_framework import status
@ -36,9 +46,11 @@ from plane.utils.paginator import BasePaginator
from plane.authentication.utils.host import user_ip
from plane.bgtasks.user_deactivation_email_task import user_deactivation_email
from plane.utils.host import base_host
from django.utils.decorators import method_decorator
from django.views.decorators.cache import cache_control
from django.views.decorators.vary import vary_on_cookie
from plane.bgtasks.user_email_update_task import send_email_update_magic_code, send_email_update_confirmation
from plane.authentication.rate_limit import EmailVerificationThrottle
logger = logging.getLogger("plane")
class UserEndpoint(BaseViewSet):
@ -49,6 +61,14 @@ class UserEndpoint(BaseViewSet):
def get_object(self):
return self.request.user
def get_throttles(self):
"""
Apply rate limiting to specific endpoints.
"""
if self.action == "generate_email_verification_code":
return [EmailVerificationThrottle()]
return super().get_throttles()
@method_decorator(cache_control(private=True, max_age=12))
@method_decorator(vary_on_cookie)
def retrieve(self, request):
@ -69,6 +89,169 @@ class UserEndpoint(BaseViewSet):
def partial_update(self, request, *args, **kwargs):
return super().partial_update(request, *args, **kwargs)
def _validate_new_email(self, user, new_email):
"""
Validate the new email address.
Args:
user: The User instance
new_email: The new email address to validate
Returns:
Response object with error if validation fails, None if validation passes
"""
if not new_email:
return Response(
{"error": "Email is required"},
status=status.HTTP_400_BAD_REQUEST,
)
# Validate email format
try:
validate_email(new_email)
except Exception:
return Response(
{"error": "Invalid email format"},
status=status.HTTP_400_BAD_REQUEST,
)
# Check if email is the same as current email
if new_email == user.email:
return Response(
{"error": "New email must be different from current email"},
status=status.HTTP_400_BAD_REQUEST,
)
# Check if email already exists in the User model
if User.objects.filter(email=new_email).exclude(id=user.id).exists():
return Response(
{"error": "An account with this email already exists"},
status=status.HTTP_400_BAD_REQUEST,
)
return None
def generate_email_verification_code(self, request):
"""
Generate and send a magic code to the new email address for verification.
Rate limited to 3 requests per hour per user (enforced by EmailVerificationThrottle).
Additional per-email cooldown of 60 seconds prevents rapid repeated requests.
"""
user = self.get_object()
new_email = request.data.get("email", "").strip().lower()
# Validate the new email
validation_error = self._validate_new_email(user, new_email)
if validation_error:
return validation_error
try:
# Generate magic code for email verification
# Use a special key prefix to distinguish from regular magic signin
# Include user ID to bind the code to the specific user
cache_key = f"magic_email_update_{user.id}_{new_email}"
## Generate a random token
token = (
"".join(secrets.choice(string.ascii_lowercase) for _ in range(4))
+ "-"
+ "".join(secrets.choice(string.ascii_lowercase) for _ in range(4))
+ "-"
+ "".join(secrets.choice(string.ascii_lowercase) for _ in range(4))
)
# Store in cache with 10 minute expiration
cache_data = json.dumps({"token": token})
cache.set(cache_key, cache_data, timeout=600)
# Send magic code to the new email
send_email_update_magic_code.delay(new_email, token)
return Response(
{"message": "Verification code sent to email"},
status=status.HTTP_200_OK,
)
except Exception as e:
logger.error("Failed to generate verification code: %s", str(e), exc_info=True)
return Response(
{"error": "Failed to generate verification code. Please try again."},
status=status.HTTP_400_BAD_REQUEST,
)
def update_email(self, request):
"""
Verify the magic code and update the user's email address.
This endpoint verifies the code and updates the existing user record
without creating a new user, ensuring the user ID remains unchanged.
"""
user = self.get_object()
new_email = request.data.get("email", "").strip().lower()
code = request.data.get("code", "").strip()
# Validate the new email
validation_error = self._validate_new_email(user, new_email)
if validation_error:
return validation_error
if not code:
return Response(
{"error": "Verification code is required"},
status=status.HTTP_400_BAD_REQUEST,
)
# Verify the magic code
try:
cache_key = f"magic_email_update_{user.id}_{new_email}"
cached_data = cache.get(cache_key)
if not cached_data:
logger.warning("Cache key not found: %s. Code may have expired or was never generated.", cache_key)
return Response(
{"error": "Verification code has expired or is invalid"},
status=status.HTTP_400_BAD_REQUEST,
)
data = json.loads(cached_data)
stored_token = data.get("token")
if str(stored_token) != str(code):
return Response(
{"error": "Invalid verification code"},
status=status.HTTP_400_BAD_REQUEST,
)
except Exception as e:
return Response(
{"error": "Failed to verify code. Please try again."},
status=status.HTTP_400_BAD_REQUEST,
)
# Final check: ensure email is still available (might have been taken between code generation and update)
if User.objects.filter(email=new_email).exclude(id=user.id).exists():
return Response(
{"error": "An account with this email already exists"},
status=status.HTTP_400_BAD_REQUEST,
)
old_email = user.email
# Update the email - this updates the existing user record without creating a new user
user.email = new_email
# Reset email verification status when email is changed
user.is_email_verified = False
user.save()
# delete the cache
cache.delete(cache_key)
# Logout the user
logout(request)
# Send confirmation email to the new email address
send_email_update_confirmation.delay(new_email)
# send the email to the old email address
send_email_update_confirmation.delay(old_email)
# Return updated user data
serialized_data = UserMeSerializer(user).data
return Response(serialized_data, status=status.HTTP_200_OK)
def deactivate(self, request):
# Check all workspace user is active
user = self.get_object()