bb-plane-fork/apps/api/plane/authentication/provider/credentials/magic_code.py
2025-07-04 15:32:21 +05:30

158 lines
5.8 KiB
Python

# Python imports
import json
import os
import random
import string
# Module imports
from plane.authentication.adapter.credential import CredentialAdapter
from plane.license.utils.instance_value import get_configuration_value
from plane.settings.redis import redis_instance
from plane.authentication.adapter.error import (
AUTHENTICATION_ERROR_CODES,
AuthenticationException,
)
from plane.db.models import User
class MagicCodeProvider(CredentialAdapter):
provider = "magic-code"
def __init__(self, request, key, code=None, callback=None):
(EMAIL_HOST, ENABLE_MAGIC_LINK_LOGIN) = get_configuration_value(
[
{"key": "EMAIL_HOST", "default": os.environ.get("EMAIL_HOST")},
{
"key": "ENABLE_MAGIC_LINK_LOGIN",
"default": os.environ.get("ENABLE_MAGIC_LINK_LOGIN", "1"),
},
]
)
if not (EMAIL_HOST):
raise AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["SMTP_NOT_CONFIGURED"],
error_message="SMTP_NOT_CONFIGURED",
payload={"email": str(key)},
)
if ENABLE_MAGIC_LINK_LOGIN == "0":
raise AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["MAGIC_LINK_LOGIN_DISABLED"],
error_message="MAGIC_LINK_LOGIN_DISABLED",
payload={"email": str(key)},
)
super().__init__(request=request, provider=self.provider, callback=callback)
self.key = key
self.code = code
def initiate(self):
## Generate a random token
token = (
"".join(random.choices(string.ascii_lowercase, k=4))
+ "-"
+ "".join(random.choices(string.ascii_lowercase, k=4))
+ "-"
+ "".join(random.choices(string.ascii_lowercase, k=4))
)
ri = redis_instance()
key = "magic_" + str(self.key)
# Check if the key already exists in python
if ri.exists(key):
data = json.loads(ri.get(key))
current_attempt = data["current_attempt"] + 1
if data["current_attempt"] > 2:
email = str(self.key).replace("magic_", "", 1)
if User.objects.filter(email=email).exists():
raise AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES[
"EMAIL_CODE_ATTEMPT_EXHAUSTED_SIGN_IN"
],
error_message="EMAIL_CODE_ATTEMPT_EXHAUSTED_SIGN_IN",
payload={"email": str(email)},
)
else:
raise AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES[
"EMAIL_CODE_ATTEMPT_EXHAUSTED_SIGN_UP"
],
error_message="EMAIL_CODE_ATTEMPT_EXHAUSTED_SIGN_UP",
payload={"email": self.key},
)
value = {
"current_attempt": current_attempt,
"email": str(self.key),
"token": token,
}
expiry = 600
ri.set(key, json.dumps(value), ex=expiry)
else:
value = {"current_attempt": 0, "email": self.key, "token": token}
expiry = 600
ri.set(key, json.dumps(value), ex=expiry)
return key, token
def set_user_data(self):
ri = redis_instance()
if ri.exists(self.key):
data = json.loads(ri.get(self.key))
token = data["token"]
email = data["email"]
if str(token) == str(self.code):
super().set_user_data(
{
"email": email,
"user": {
"avatar": "",
"first_name": "",
"last_name": "",
"provider_id": "",
"is_password_autoset": True,
},
}
)
# Delete the token from redis if the code match is successful
ri.delete(self.key)
return
else:
email = str(self.key).replace("magic_", "", 1)
if User.objects.filter(email=email).exists():
raise AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES[
"INVALID_MAGIC_CODE_SIGN_IN"
],
error_message="INVALID_MAGIC_CODE_SIGN_IN",
payload={"email": str(email)},
)
else:
raise AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES[
"INVALID_MAGIC_CODE_SIGN_UP"
],
error_message="INVALID_MAGIC_CODE_SIGN_UP",
payload={"email": str(email)},
)
else:
email = str(self.key).replace("magic_", "", 1)
if User.objects.filter(email=email).exists():
raise AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["EXPIRED_MAGIC_CODE_SIGN_IN"],
error_message="EXPIRED_MAGIC_CODE_SIGN_IN",
payload={"email": str(email)},
)
else:
raise AuthenticationException(
error_code=AUTHENTICATION_ERROR_CODES["EXPIRED_MAGIC_CODE_SIGN_UP"],
error_message="EXPIRED_MAGIC_CODE_SIGN_UP",
payload={"email": str(email)},
)