* fix: next path url redirection * fix: enhance URL redirection safety in authentication views Updated SignInAuthSpaceEndpoint, GitHubCallbackSpaceEndpoint, GitLabCallbackSpaceEndpoint, and GoogleCallbackSpaceEndpoint to include checks for allowed hosts and schemes before redirecting. This improves the security of URL redirection by ensuring only valid URLs are used. * chore: updated uitl to handle double / --------- Co-authored-by: pablohashescobar <nikhilschacko@gmail.com> Co-authored-by: Nikhil <118773738+pablohashescobar@users.noreply.github.com>
215 lines
8.5 KiB
Python
215 lines
8.5 KiB
Python
# Django imports
|
|
from django.core.exceptions import ValidationError
|
|
from django.core.validators import validate_email
|
|
from django.http import HttpResponseRedirect
|
|
from django.views import View
|
|
from django.utils.http import url_has_allowed_host_and_scheme
|
|
|
|
# Module imports
|
|
from plane.authentication.provider.credentials.email import EmailProvider
|
|
from plane.authentication.utils.login import user_login
|
|
from plane.license.models import Instance
|
|
from plane.authentication.utils.host import base_host
|
|
from plane.db.models import User
|
|
from plane.authentication.adapter.error import (
|
|
AUTHENTICATION_ERROR_CODES,
|
|
AuthenticationException,
|
|
)
|
|
from plane.utils.path_validator import get_safe_redirect_url, validate_next_path, get_allowed_hosts
|
|
|
|
|
|
class SignInAuthSpaceEndpoint(View):
|
|
def post(self, request):
|
|
next_path = request.POST.get("next_path")
|
|
# Check instance configuration
|
|
instance = Instance.objects.first()
|
|
if instance is None or not instance.is_setup_done:
|
|
# Redirection params
|
|
exc = AuthenticationException(
|
|
error_code=AUTHENTICATION_ERROR_CODES["INSTANCE_NOT_CONFIGURED"],
|
|
error_message="INSTANCE_NOT_CONFIGURED",
|
|
)
|
|
params = exc.get_error_dict()
|
|
url = get_safe_redirect_url(
|
|
base_url=base_host(request=request, is_space=True),
|
|
next_path=next_path,
|
|
params=params
|
|
)
|
|
return HttpResponseRedirect(url)
|
|
|
|
# set the referer as session to redirect after login
|
|
email = request.POST.get("email", False)
|
|
password = request.POST.get("password", False)
|
|
|
|
## Raise exception if any of the above are missing
|
|
if not email or not password:
|
|
exc = AuthenticationException(
|
|
error_code=AUTHENTICATION_ERROR_CODES[
|
|
"REQUIRED_EMAIL_PASSWORD_SIGN_IN"
|
|
],
|
|
error_message="REQUIRED_EMAIL_PASSWORD_SIGN_IN",
|
|
payload={"email": str(email)},
|
|
)
|
|
params = exc.get_error_dict()
|
|
url = get_safe_redirect_url(
|
|
base_url=base_host(request=request, is_space=True),
|
|
next_path=next_path,
|
|
params=params
|
|
)
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Validate email
|
|
email = email.strip().lower()
|
|
try:
|
|
validate_email(email)
|
|
except ValidationError:
|
|
exc = AuthenticationException(
|
|
error_code=AUTHENTICATION_ERROR_CODES["INVALID_EMAIL_SIGN_IN"],
|
|
error_message="INVALID_EMAIL_SIGN_IN",
|
|
payload={"email": str(email)},
|
|
)
|
|
params = exc.get_error_dict()
|
|
url = get_safe_redirect_url(
|
|
base_url=base_host(request=request, is_space=True),
|
|
next_path=next_path,
|
|
params=params
|
|
)
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Existing User
|
|
existing_user = User.objects.filter(email=email).first()
|
|
|
|
if not existing_user:
|
|
exc = AuthenticationException(
|
|
error_code=AUTHENTICATION_ERROR_CODES["USER_DOES_NOT_EXIST"],
|
|
error_message="USER_DOES_NOT_EXIST",
|
|
payload={"email": str(email)},
|
|
)
|
|
params = exc.get_error_dict()
|
|
url = get_safe_redirect_url(
|
|
base_url=base_host(request=request, is_space=True),
|
|
next_path=next_path,
|
|
params=params
|
|
)
|
|
return HttpResponseRedirect(url)
|
|
|
|
try:
|
|
provider = EmailProvider(
|
|
request=request, key=email, code=password, is_signup=False
|
|
)
|
|
user = provider.authenticate()
|
|
# Login the user and record his device info
|
|
user_login(request=request, user=user, is_space=True)
|
|
# redirect to referer path
|
|
next_path = validate_next_path(next_path=next_path)
|
|
url = f"{base_host(request=request, is_space=True).rstrip('/')}{next_path}"
|
|
if url_has_allowed_host_and_scheme(url, allowed_hosts=get_allowed_hosts()):
|
|
return HttpResponseRedirect(url)
|
|
else:
|
|
return HttpResponseRedirect(base_host(request=request, is_space=True))
|
|
except AuthenticationException as e:
|
|
params = e.get_error_dict()
|
|
url = get_safe_redirect_url(
|
|
base_url=base_host(request=request, is_space=True),
|
|
next_path=next_path,
|
|
params=params
|
|
)
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
class SignUpAuthSpaceEndpoint(View):
|
|
def post(self, request):
|
|
next_path = request.POST.get("next_path")
|
|
# Check instance configuration
|
|
instance = Instance.objects.first()
|
|
if instance is None or not instance.is_setup_done:
|
|
# Redirection params
|
|
exc = AuthenticationException(
|
|
error_code=AUTHENTICATION_ERROR_CODES["INSTANCE_NOT_CONFIGURED"],
|
|
error_message="INSTANCE_NOT_CONFIGURED",
|
|
)
|
|
params = exc.get_error_dict()
|
|
url = get_safe_redirect_url(
|
|
base_url=base_host(request=request, is_space=True),
|
|
next_path=next_path,
|
|
params=params
|
|
)
|
|
return HttpResponseRedirect(url)
|
|
|
|
email = request.POST.get("email", False)
|
|
password = request.POST.get("password", False)
|
|
## Raise exception if any of the above are missing
|
|
if not email or not password:
|
|
# Redirection params
|
|
exc = AuthenticationException(
|
|
error_code=AUTHENTICATION_ERROR_CODES[
|
|
"REQUIRED_EMAIL_PASSWORD_SIGN_UP"
|
|
],
|
|
error_message="REQUIRED_EMAIL_PASSWORD_SIGN_UP",
|
|
payload={"email": str(email)},
|
|
)
|
|
params = exc.get_error_dict()
|
|
url = get_safe_redirect_url(
|
|
base_url=base_host(request=request, is_space=True),
|
|
next_path=next_path,
|
|
params=params
|
|
)
|
|
return HttpResponseRedirect(url)
|
|
# Validate the email
|
|
email = email.strip().lower()
|
|
try:
|
|
validate_email(email)
|
|
except ValidationError:
|
|
# Redirection params
|
|
exc = AuthenticationException(
|
|
error_code=AUTHENTICATION_ERROR_CODES["INVALID_EMAIL_SIGN_UP"],
|
|
error_message="INVALID_EMAIL_SIGN_UP",
|
|
payload={"email": str(email)},
|
|
)
|
|
params = exc.get_error_dict()
|
|
url = get_safe_redirect_url(
|
|
base_url=base_host(request=request, is_space=True),
|
|
next_path=next_path,
|
|
params=params
|
|
)
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Existing User
|
|
existing_user = User.objects.filter(email=email).first()
|
|
|
|
if existing_user:
|
|
exc = AuthenticationException(
|
|
error_code=AUTHENTICATION_ERROR_CODES["USER_ALREADY_EXIST"],
|
|
error_message="USER_ALREADY_EXIST",
|
|
payload={"email": str(email)},
|
|
)
|
|
params = exc.get_error_dict()
|
|
url = get_safe_redirect_url(
|
|
base_url=base_host(request=request, is_space=True),
|
|
next_path=next_path,
|
|
params=params
|
|
)
|
|
return HttpResponseRedirect(url)
|
|
|
|
try:
|
|
provider = EmailProvider(
|
|
request=request, key=email, code=password, is_signup=True
|
|
)
|
|
user = provider.authenticate()
|
|
# Login the user and record his device info
|
|
user_login(request=request, user=user, is_space=True)
|
|
# redirect to referer path
|
|
next_path = validate_next_path(next_path=next_path)
|
|
url = f"{base_host(request=request, is_space=True).rstrip('/')}{next_path}"
|
|
if url_has_allowed_host_and_scheme(url, allowed_hosts=get_allowed_hosts()):
|
|
return HttpResponseRedirect(url)
|
|
else:
|
|
return HttpResponseRedirect(base_host(request=request, is_space=True))
|
|
except AuthenticationException as e:
|
|
params = e.get_error_dict()
|
|
url = get_safe_redirect_url(
|
|
base_url=base_host(request=request, is_space=True),
|
|
next_path=next_path,
|
|
params=params
|
|
)
|
|
return HttpResponseRedirect(url)
|