# Third party imports from rest_framework import status from rest_framework.permissions import AllowAny from rest_framework.response import Response from rest_framework.views import APIView from zxcvbn import zxcvbn ## Module imports from plane.app.serializers import ( ChangePasswordSerializer, UserSerializer, ) from plane.authentication.utils.login import user_login from plane.db.models import User from plane.authentication.adapter.error import ( AuthenticationException, AUTHENTICATION_ERROR_CODES, ) from django.middleware.csrf import get_token class CSRFTokenEndpoint(APIView): permission_classes = [ AllowAny, ] def get(self, request): # Generate a CSRF token csrf_token = get_token(request) # Return the CSRF token in a JSON response return Response( {"csrf_token": str(csrf_token)}, status=status.HTTP_200_OK ) class ChangePasswordEndpoint(APIView): def post(self, request): serializer = ChangePasswordSerializer(data=request.data) user = User.objects.get(pk=request.user.id) if serializer.is_valid(): if not user.check_password(serializer.data.get("old_password")): exc = AuthenticationException( error_code=AUTHENTICATION_ERROR_CODES[ "INCORRECT_OLD_PASSWORD" ], error_message="INCORRECT_OLD_PASSWORD", payload={"error": "Old password is not correct"}, ) return Response( exc.get_error_dict(), status=status.HTTP_400_BAD_REQUEST, ) # check the password score results = zxcvbn(serializer.data.get("new_password")) if results["score"] < 3: exc = AuthenticationException( error_code=AUTHENTICATION_ERROR_CODES[ "INVALID_NEW_PASSWORD" ], error_message="INVALID_NEW_PASSWORD", ) return Response( exc.get_error_dict(), status=status.HTTP_400_BAD_REQUEST, ) # set_password also hashes the password that the user will get user.set_password(serializer.data.get("new_password")) user.is_password_autoset = False user.save() user_login(user=user, request=request) return Response( {"message": "Password updated successfully"}, status=status.HTTP_200_OK, ) exc = AuthenticationException( error_code=AUTHENTICATION_ERROR_CODES["INVALID_PASSWORD"], error_message="INVALID_PASSWORD", ) return Response( exc.get_error_dict(), status=status.HTTP_400_BAD_REQUEST, ) class SetUserPasswordEndpoint(APIView): def post(self, request): user = User.objects.get(pk=request.user.id) password = request.data.get("password", False) # If the user password is not autoset then return error if not user.is_password_autoset: exc = AuthenticationException( error_code=AUTHENTICATION_ERROR_CODES["PASSWORD_ALREADY_SET"], error_message="PASSWORD_ALREADY_SET", payload={ "error": "Your password is already set please change your password from profile" }, ) return Response( exc.get_error_dict(), status=status.HTTP_400_BAD_REQUEST, ) # Check password validation if not password: exc = AuthenticationException( error_code=AUTHENTICATION_ERROR_CODES["INVALID_PASSWORD"], error_message="INVALID_PASSWORD", ) return Response( exc.get_error_dict(), status=status.HTTP_400_BAD_REQUEST, ) results = zxcvbn(password) if results["score"] < 3: exc = AuthenticationException( error_code=AUTHENTICATION_ERROR_CODES["INVALID_PASSWORD"], error_message="INVALID_PASSWORD", ) return Response( exc.get_error_dict(), status=status.HTTP_400_BAD_REQUEST, ) # Set the user password user.set_password(password) user.is_password_autoset = False user.save() # Login the user as the session is invalidated user_login(user=user, request=request) # Return the user serializer = UserSerializer(user) return Response(serializer.data, status=status.HTTP_200_OK)