bb-plane-fork/apiserver/plane/authentication/views/space/email.py
Anmol Singh Bhatia 9b7b23f5a2
[WEB-1309] fix: auth fixes (#4456)
* dev: magic link login and email password disable

* dev: user account deactivation

* dev: change nginx conf routes

* feat: changemod space

* fix: space app dir fixes

* dev: invalidate cache for instances when creating workspace

* dev: update email templates for test email

* dev: fix build errors

* fix: auth fixes and improvement (#4452)

* chore: change password api updated and missing password error code added

* chore: auth helper updated

* chore: disable send code input suggestion

* chore: change password function updated

* fix: application error on sign in page

* chore: change password validation added and enhancement

* dev: space base path in web

* dev: admin user deactivated

* dev: user and instance admin session endpoint

* fix: last_workspace_id endpoint updated

* fix: magic sign in and email password check added

---------

Co-authored-by: pablohashescobar <nikhilschacko@gmail.com>
Co-authored-by: sriram veeraghanta <veeraghanta.sriram@gmail.com>
Co-authored-by: guru_sainath <gurusainath007@gmail.com>
2024-05-14 20:53:51 +05:30

262 lines
9.5 KiB
Python

# Python imports
from urllib.parse import urlencode, urljoin
# Django imports
from django.core.exceptions import ValidationError
from django.core.validators import validate_email
from django.http import HttpResponseRedirect
from django.views import View
# Module imports
from plane.authentication.provider.credentials.email import EmailProvider
from plane.authentication.utils.login import user_login
from plane.license.models import Instance
from plane.authentication.utils.host import base_host
from plane.db.models import User
from plane.authentication.adapter.error import (
AUTHENTICATION_ERROR_CODES,
AuthenticationException,
)
class SignInAuthSpaceEndpoint(View):
def post(self, request):
next_path = request.POST.get("next_path")
# Check instance configuration
instance = Instance.objects.first()
if instance is None or not instance.is_setup_done:
# Redirection params
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES[
"INSTANCE_NOT_CONFIGURED"
],
error_message="INSTANCE_NOT_CONFIGURED",
)
params = exc.get_error_dict()
if next_path:
params["next_path"] = str(next_path)
url = urljoin(
base_host(request=request, is_space=True),
"?" + urlencode(params),
)
return HttpResponseRedirect(url)
# set the referer as session to redirect after login
email = request.POST.get("email", False)
password = request.POST.get("password", False)
## Raise exception if any of the above are missing
if not email or not password:
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES[
"REQUIRED_EMAIL_PASSWORD_SIGN_IN"
],
error_message="REQUIRED_EMAIL_PASSWORD_SIGN_IN",
payload={"email": str(email)},
)
params = exc.get_error_dict()
if next_path:
params["next_path"] = str(next_path)
url = urljoin(
base_host(request=request, is_space=True),
"?" + urlencode(params),
)
return HttpResponseRedirect(url)
# Validate email
email = email.strip().lower()
try:
validate_email(email)
except ValidationError:
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["INVALID_EMAIL_SIGN_IN"],
error_message="INVALID_EMAIL_SIGN_IN",
payload={"email": str(email)},
)
params = exc.get_error_dict()
if next_path:
params["next_path"] = str(next_path)
url = urljoin(
base_host(request=request, is_space=True),
"?" + urlencode(params),
)
return HttpResponseRedirect(url)
# Existing User
existing_user = User.objects.filter(email=email).first()
if not existing_user:
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["USER_DOES_NOT_EXIST"],
error_message="USER_DOES_NOT_EXIST",
payload={"email": str(email)},
)
params = exc.get_error_dict()
if next_path:
params["next_path"] = str(next_path)
url = urljoin(
base_host(request=request, is_space=True),
"?" + urlencode(params),
)
return HttpResponseRedirect(url)
if not existing_user.is_active:
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES[
"USER_ACCOUNT_DEACTIVATED"
],
error_message="USER_ACCOUNT_DEACTIVATED",
)
params = exc.get_error_dict()
if next_path:
params["next_path"] = str(next_path)
url = urljoin(
base_host(request=request, is_space=True),
"?" + urlencode(params),
)
return HttpResponseRedirect(url)
try:
provider = EmailProvider(
request=request, key=email, code=password, is_signup=False
)
user = provider.authenticate()
# Login the user and record his device info
user_login(request=request, user=user, is_space=True)
# redirect to next path
url = urljoin(
base_host(request=request, is_space=True),
str(next_path) if next_path else "",
)
return HttpResponseRedirect(url)
except AuthenticationException as e:
params = e.get_error_dict()
if next_path:
params["next_path"] = str(next_path)
url = urljoin(
base_host(request=request, is_space=True),
"?" + urlencode(params),
)
return HttpResponseRedirect(url)
class SignUpAuthSpaceEndpoint(View):
def post(self, request):
next_path = request.POST.get("next_path")
# Check instance configuration
instance = Instance.objects.first()
if instance is None or not instance.is_setup_done:
# Redirection params
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES[
"INSTANCE_NOT_CONFIGURED"
],
error_message="INSTANCE_NOT_CONFIGURED",
)
params = exc.get_error_dict()
if next_path:
params["next_path"] = str(next_path)
url = urljoin(
base_host(request=request, is_space=True),
"?" + urlencode(params),
)
return HttpResponseRedirect(url)
email = request.POST.get("email", False)
password = request.POST.get("password", False)
## Raise exception if any of the above are missing
if not email or not password:
# Redirection params
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES[
"REQUIRED_EMAIL_PASSWORD_SIGN_UP"
],
error_message="REQUIRED_EMAIL_PASSWORD_SIGN_UP",
payload={"email": str(email)},
)
params = exc.get_error_dict()
if next_path:
params["next_path"] = str(next_path)
url = urljoin(
base_host(request=request, is_space=True),
"?" + urlencode(params),
)
return HttpResponseRedirect(url)
# Validate the email
email = email.strip().lower()
try:
validate_email(email)
except ValidationError:
# Redirection params
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["INVALID_EMAIL_SIGN_UP"],
error_message="INVALID_EMAIL_SIGN_UP",
payload={"email": str(email)},
)
params = exc.get_error_dict()
if next_path:
params["next_path"] = str(next_path)
url = urljoin(
base_host(request=request, is_space=True),
"?" + urlencode(params),
)
return HttpResponseRedirect(url)
# Existing User
existing_user = User.objects.filter(email=email).first()
if existing_user:
if not existing_user.is_active:
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES[
"USER_ACCOUNT_DEACTIVATED"
],
error_message="USER_ACCOUNT_DEACTIVATED",
)
params = exc.get_error_dict()
if next_path:
params["next_path"] = str(next_path)
url = urljoin(
base_host(request=request, is_space=True),
"?" + urlencode(params),
)
return HttpResponseRedirect(url)
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["USER_ALREADY_EXIST"],
error_message="USER_ALREADY_EXIST",
payload={"email": str(email)},
)
params = exc.get_error_dict()
if next_path:
params["next_path"] = str(next_path)
url = urljoin(
base_host(request=request, is_space=True),
"?" + urlencode(params),
)
return HttpResponseRedirect(url)
try:
provider = EmailProvider(
request=request, key=email, code=password, is_signup=True
)
user = provider.authenticate()
# Login the user and record his device info
user_login(request=request, user=user, is_space=True)
# redirect to referer path
url = urljoin(
base_host(request=request, is_space=True),
str(next_path) if next_path else "",
)
return HttpResponseRedirect(url)
except AuthenticationException as e:
params = e.get_error_dict()
if next_path:
params["next_path"] = str(next_path)
url = urljoin(
base_host(request=request, is_space=True),
"?" + urlencode(params),
)
return HttpResponseRedirect(url)