* refactor: enhance URL validation and redirection logic in authentication views * Updated authentication views (SignInAuthSpaceEndpoint, GitHubCallbackSpaceEndpoint, GitLabCallbackSpaceEndpoint, GoogleCallbackSpaceEndpoint, and MagicSignInSpaceEndpoint) to include url_has_allowed_host_and_scheme checks for safer redirection. * Improved URL construction by ensuring proper formatting and fallback to base host when necessary. * Added get_allowed_hosts function to path_validator.py for better host validation. * refactor: improve comments and clean up code in path_validator.py * Updated comments for clarity in the get_safe_redirect_url function. * Removed unnecessary blank line to enhance
110 lines
4.3 KiB
Python
110 lines
4.3 KiB
Python
# Python imports
|
|
import uuid
|
|
|
|
# Django import
|
|
from django.http import HttpResponseRedirect
|
|
from django.views import View
|
|
from django.utils.http import url_has_allowed_host_and_scheme
|
|
|
|
# Module imports
|
|
from plane.authentication.provider.oauth.github import GitHubOAuthProvider
|
|
from plane.authentication.utils.login import user_login
|
|
from plane.license.models import Instance
|
|
from plane.authentication.utils.host import base_host
|
|
from plane.authentication.adapter.error import (
|
|
AUTHENTICATION_ERROR_CODES,
|
|
AuthenticationException,
|
|
)
|
|
from plane.utils.path_validator import get_safe_redirect_url, validate_next_path, get_allowed_hosts
|
|
|
|
|
|
class GitHubOauthInitiateSpaceEndpoint(View):
|
|
def get(self, request):
|
|
# Get host and next path
|
|
request.session["host"] = base_host(request=request, is_space=True)
|
|
next_path = request.GET.get("next_path")
|
|
# Check instance configuration
|
|
instance = Instance.objects.first()
|
|
if instance is None or not instance.is_setup_done:
|
|
exc = AuthenticationException(
|
|
error_code=AUTHENTICATION_ERROR_CODES["INSTANCE_NOT_CONFIGURED"],
|
|
error_message="INSTANCE_NOT_CONFIGURED",
|
|
)
|
|
params = exc.get_error_dict()
|
|
url = get_safe_redirect_url(
|
|
base_url=base_host(request=request, is_space=True),
|
|
next_path=next_path,
|
|
params=params
|
|
)
|
|
return HttpResponseRedirect(url)
|
|
|
|
try:
|
|
state = uuid.uuid4().hex
|
|
provider = GitHubOAuthProvider(request=request, state=state)
|
|
request.session["state"] = state
|
|
auth_url = provider.get_auth_url()
|
|
return HttpResponseRedirect(auth_url)
|
|
except AuthenticationException as e:
|
|
params = e.get_error_dict()
|
|
url = get_safe_redirect_url(
|
|
base_url=base_host(request=request, is_space=True),
|
|
next_path=next_path,
|
|
params=params
|
|
)
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
class GitHubCallbackSpaceEndpoint(View):
|
|
def get(self, request):
|
|
code = request.GET.get("code")
|
|
state = request.GET.get("state")
|
|
base_host = request.session.get("host")
|
|
next_path = request.session.get("next_path")
|
|
|
|
if state != request.session.get("state", ""):
|
|
exc = AuthenticationException(
|
|
error_code=AUTHENTICATION_ERROR_CODES["GITHUB_OAUTH_PROVIDER_ERROR"],
|
|
error_message="GITHUB_OAUTH_PROVIDER_ERROR",
|
|
)
|
|
params = exc.get_error_dict()
|
|
url = get_safe_redirect_url(
|
|
base_url=base_host(request=request, is_space=True),
|
|
next_path=next_path,
|
|
params=params
|
|
)
|
|
return HttpResponseRedirect(url)
|
|
|
|
if not code:
|
|
exc = AuthenticationException(
|
|
error_code=AUTHENTICATION_ERROR_CODES["GITHUB_OAUTH_PROVIDER_ERROR"],
|
|
error_message="GITHUB_OAUTH_PROVIDER_ERROR",
|
|
)
|
|
params = exc.get_error_dict()
|
|
url = get_safe_redirect_url(
|
|
base_url=base_host(request=request, is_space=True),
|
|
next_path=next_path,
|
|
params=params
|
|
)
|
|
return HttpResponseRedirect(url)
|
|
|
|
try:
|
|
provider = GitHubOAuthProvider(request=request, code=code)
|
|
user = provider.authenticate()
|
|
# Login the user and record his device info
|
|
user_login(request=request, user=user, is_space=True)
|
|
# Process workspace and project invitations
|
|
# redirect to referer path
|
|
next_path = validate_next_path(next_path=next_path)
|
|
url = f"{base_host(request=request, is_space=True).rstrip('/')}{next_path}"
|
|
if url_has_allowed_host_and_scheme(url, allowed_hosts=get_allowed_hosts()):
|
|
return HttpResponseRedirect(url)
|
|
else:
|
|
return HttpResponseRedirect(base_host(request=request, is_space=True))
|
|
except AuthenticationException as e:
|
|
params = e.get_error_dict()
|
|
url = get_safe_redirect_url(
|
|
base_url=base_host(request=request, is_space=True),
|
|
next_path=next_path,
|
|
params=params
|
|
)
|
|
return HttpResponseRedirect(url)
|