# Python imports from urllib.parse import urlencode, urljoin # Django imports from django.core.exceptions import ValidationError from django.core.validators import validate_email from django.http import HttpResponseRedirect from django.views import View # Module imports from plane.authentication.provider.credentials.email import EmailProvider from plane.authentication.utils.login import user_login from plane.license.models import Instance from plane.authentication.utils.host import base_host from plane.db.models import User from plane.authentication.adapter.error import ( AUTHENTICATION_ERROR_CODES, AuthenticationException, ) class SignInAuthSpaceEndpoint(View): def post(self, request): next_path = request.POST.get("next_path") # Check instance configuration instance = Instance.objects.first() if instance is None or not instance.is_setup_done: # Redirection params exc = AuthenticationException( error_code=AUTHENTICATION_ERROR_CODES[ "INSTANCE_NOT_CONFIGURED" ], error_message="INSTANCE_NOT_CONFIGURED", ) params = exc.get_error_dict() if next_path: params["next_path"] = str(next_path) url = urljoin( base_host(request=request, is_space=True), "accounts/sign-in?" + urlencode(params), ) return HttpResponseRedirect(url) # set the referer as session to redirect after login email = request.POST.get("email", False) password = request.POST.get("password", False) ## Raise exception if any of the above are missing if not email or not password: exc = AuthenticationException( error_code=AUTHENTICATION_ERROR_CODES[ "REQUIRED_EMAIL_PASSWORD_SIGN_IN" ], error_message="REQUIRED_EMAIL_PASSWORD_SIGN_IN", payload={"email": str(email)}, ) params = exc.get_error_dict() if next_path: params["next_path"] = str(next_path) url = urljoin( base_host(request=request, is_space=True), "spaces/accounts/sign-in?" + urlencode(params), ) return HttpResponseRedirect(url) # Validate email email = email.strip().lower() try: validate_email(email) except ValidationError: exc = AuthenticationException( error_code=AUTHENTICATION_ERROR_CODES["INVALID_EMAIL_SIGN_IN"], error_message="INVALID_EMAIL_SIGN_IN", payload={"email": str(email)}, ) params = exc.get_error_dict() if next_path: params["next_path"] = str(next_path) url = urljoin( base_host(request=request, is_space=True), "spaces/accounts/sign-in?" + urlencode(params), ) return HttpResponseRedirect(url) if not User.objects.filter(email=email).exists(): exc = AuthenticationException( error_code=AUTHENTICATION_ERROR_CODES["USER_DOES_NOT_EXIST"], error_message="USER_DOES_NOT_EXIST", payload={"email": str(email)}, ) params = exc.get_error_dict() if next_path: params["next_path"] = str(next_path) url = urljoin( base_host(request=request, is_space=True), "spaces/accounts/sign-in?" + urlencode(params), ) return HttpResponseRedirect(url) try: provider = EmailProvider( request=request, key=email, code=password, is_signup=False ) user = provider.authenticate() # Login the user and record his device info user_login(request=request, user=user) # redirect to next path url = urljoin( base_host(request=request, is_space=True), str(next_path) if next_path else "/", ) return HttpResponseRedirect(url) except AuthenticationException as e: params = e.get_error_dict() if next_path: params["next_path"] = str(next_path) url = urljoin( base_host(request=request, is_space=True), "spaces/accounts/sign-in?" + urlencode(params), ) return HttpResponseRedirect(url) class SignUpAuthSpaceEndpoint(View): def post(self, request): next_path = request.POST.get("next_path") # Check instance configuration instance = Instance.objects.first() if instance is None or not instance.is_setup_done: # Redirection params exc = AuthenticationException( error_code=AUTHENTICATION_ERROR_CODES[ "INSTANCE_NOT_CONFIGURED" ], error_message="INSTANCE_NOT_CONFIGURED", ) params = exc.get_error_dict() if next_path: params["next_path"] = str(next_path) url = urljoin( base_host(request=request, is_space=True), "spaces?" + urlencode(params), ) return HttpResponseRedirect(url) email = request.POST.get("email", False) password = request.POST.get("password", False) ## Raise exception if any of the above are missing if not email or not password: # Redirection params exc = AuthenticationException( error_code=AUTHENTICATION_ERROR_CODES[ "REQUIRED_EMAIL_PASSWORD_SIGN_UP" ], error_message="REQUIRED_EMAIL_PASSWORD_SIGN_UP", payload={"email": str(email)}, ) params = exc.get_error_dict() if next_path: params["next_path"] = str(next_path) url = urljoin( base_host(request=request, is_space=True), "spaces?" + urlencode(params), ) return HttpResponseRedirect(url) # Validate the email email = email.strip().lower() try: validate_email(email) except ValidationError: # Redirection params exc = AuthenticationException( error_code=AUTHENTICATION_ERROR_CODES["INVALID_EMAIL_SIGN_UP"], error_message="INVALID_EMAIL_SIGN_UP", payload={"email": str(email)}, ) params = exc.get_error_dict() if next_path: params["next_path"] = str(next_path) url = urljoin( base_host(request=request, is_space=True), "spaces?" + urlencode(params), ) return HttpResponseRedirect(url) if User.objects.filter(email=email).exists(): exc = AuthenticationException( error_code=AUTHENTICATION_ERROR_CODES["USER_ALREADY_EXIST"], error_message="USER_ALREADY_EXIST", payload={"email": str(email)}, ) params = exc.get_error_dict() if next_path: params["next_path"] = str(next_path) url = urljoin( base_host(request=request, is_space=True), "spaces?" + urlencode(params), ) return HttpResponseRedirect(url) try: provider = EmailProvider( request=request, key=email, code=password, is_signup=True ) user = provider.authenticate() # Login the user and record his device info user_login(request=request, user=user) # redirect to referer path url = urljoin( base_host(request=request, is_space=True), str(next_path) if next_path else "spaces", ) return HttpResponseRedirect(url) except AuthenticationException as e: params = e.get_error_dict() if next_path: params["next_path"] = str(next_path) url = urljoin( base_host(request=request, is_space=True), "spaces?" + urlencode(params), ) return HttpResponseRedirect(url)