bb-plane-fork/apps/api/plane/authentication/views/space/magic.py
sriram veeraghanta 02d0ee3e0f
chore: add copyright (#8584)
* feat: adding new copyright info on all files

* chore: adding CI
2026-01-27 13:54:22 +05:30

170 lines
6.9 KiB
Python

# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.
# Django imports
from django.core.validators import validate_email
from django.http import HttpResponseRedirect
from django.views import View
from django.utils.http import url_has_allowed_host_and_scheme
# Third party imports
from rest_framework import status
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework.views import APIView
# Module imports
from plane.authentication.provider.credentials.magic_code import MagicCodeProvider
from plane.authentication.utils.login import user_login
from plane.bgtasks.magic_link_code_task import magic_link
from plane.license.models import Instance
from plane.authentication.utils.host import base_host
from plane.db.models import User
from plane.authentication.adapter.error import (
AuthenticationException,
AUTHENTICATION_ERROR_CODES,
)
from plane.utils.path_validator import get_safe_redirect_url, validate_next_path, get_allowed_hosts
class MagicGenerateSpaceEndpoint(APIView):
permission_classes = [AllowAny]
def post(self, request):
# Check if instance is configured
instance = Instance.objects.first()
if instance is None or not instance.is_setup_done:
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["INSTANCE_NOT_CONFIGURED"],
error_message="INSTANCE_NOT_CONFIGURED",
)
return Response(exc.get_error_dict(), status=status.HTTP_400_BAD_REQUEST)
email = request.data.get("email", "").strip().lower()
try:
validate_email(email)
adapter = MagicCodeProvider(request=request, key=email)
key, token = adapter.initiate()
# If the smtp is configured send through here
magic_link.delay(email, key, token)
return Response({"key": str(key)}, status=status.HTTP_200_OK)
except AuthenticationException as e:
return Response(e.get_error_dict(), status=status.HTTP_400_BAD_REQUEST)
class MagicSignInSpaceEndpoint(View):
def post(self, request):
# set the referer as session to redirect after login
code = request.POST.get("code", "").strip()
email = request.POST.get("email", "").strip().lower()
next_path = request.POST.get("next_path")
if code == "" or email == "":
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["MAGIC_SIGN_IN_EMAIL_CODE_REQUIRED"],
error_message="MAGIC_SIGN_IN_EMAIL_CODE_REQUIRED",
)
params = exc.get_error_dict()
url = get_safe_redirect_url(
base_url=base_host(request=request, is_space=True),
next_path=next_path,
params=params,
)
return HttpResponseRedirect(url)
existing_user = User.objects.filter(email=email).first()
if not existing_user:
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["USER_DOES_NOT_EXIST"],
error_message="USER_DOES_NOT_EXIST",
)
params = exc.get_error_dict()
url = get_safe_redirect_url(
base_url=base_host(request=request, is_space=True),
next_path=next_path,
params=params,
)
return HttpResponseRedirect(url)
# Active User
try:
provider = MagicCodeProvider(request=request, key=f"magic_{email}", code=code)
user = provider.authenticate()
# Login the user and record his device info
user_login(request=request, user=user, is_space=True)
# redirect to referer path
next_path = validate_next_path(next_path=next_path)
url = f"{base_host(request=request, is_space=True).rstrip('/')}{next_path}"
if url_has_allowed_host_and_scheme(url, allowed_hosts=get_allowed_hosts()):
return HttpResponseRedirect(url)
else:
return HttpResponseRedirect(base_host(request=request, is_space=True))
except AuthenticationException as e:
params = e.get_error_dict()
url = get_safe_redirect_url(
base_url=base_host(request=request, is_space=True),
next_path=next_path,
params=params,
)
return HttpResponseRedirect(url)
class MagicSignUpSpaceEndpoint(View):
def post(self, request):
# set the referer as session to redirect after login
code = request.POST.get("code", "").strip()
email = request.POST.get("email", "").strip().lower()
next_path = request.POST.get("next_path")
if code == "" or email == "":
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["MAGIC_SIGN_UP_EMAIL_CODE_REQUIRED"],
error_message="MAGIC_SIGN_UP_EMAIL_CODE_REQUIRED",
)
params = exc.get_error_dict()
url = get_safe_redirect_url(
base_url=base_host(request=request, is_space=True),
next_path=next_path,
params=params,
)
return HttpResponseRedirect(url)
# Existing User
existing_user = User.objects.filter(email=email).first()
# Already existing
if existing_user:
exc = AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["USER_ALREADY_EXIST"],
error_message="USER_ALREADY_EXIST",
)
params = exc.get_error_dict()
url = get_safe_redirect_url(
base_url=base_host(request=request, is_space=True),
next_path=next_path,
params=params,
)
return HttpResponseRedirect(url)
try:
provider = MagicCodeProvider(request=request, key=f"magic_{email}", code=code)
user = provider.authenticate()
# Login the user and record his device info
user_login(request=request, user=user, is_space=True)
# redirect to referer path
next_path = validate_next_path(next_path=next_path)
url = f"{base_host(request=request, is_space=True).rstrip('/')}{next_path}"
if url_has_allowed_host_and_scheme(url, allowed_hosts=get_allowed_hosts()):
return HttpResponseRedirect(url)
else:
return HttpResponseRedirect(base_host(request=request, is_space=True))
except AuthenticationException as e:
params = e.get_error_dict()
url = get_safe_redirect_url(
base_url=base_host(request=request, is_space=True),
next_path=next_path,
params=params,
)
return HttpResponseRedirect(url)