169 lines
6.6 KiB
Python
169 lines
6.6 KiB
Python
import os
|
|
from datetime import datetime, timedelta
|
|
from urllib.parse import urlencode, urlparse
|
|
import pytz
|
|
import requests
|
|
|
|
# Module imports
|
|
from plane.authentication.adapter.oauth import OauthAdapter
|
|
from plane.license.utils.instance_value import get_configuration_value
|
|
from plane.authentication.adapter.error import (
|
|
AUTHENTICATION_ERROR_CODES,
|
|
AuthenticationException,
|
|
)
|
|
|
|
|
|
class GiteaOAuthProvider(OauthAdapter):
|
|
provider = "gitea"
|
|
scope = "openid email profile"
|
|
|
|
def __init__(self, request, code=None, state=None, callback=None):
|
|
(GITEA_CLIENT_ID, GITEA_CLIENT_SECRET, GITEA_HOST) = get_configuration_value(
|
|
[
|
|
{
|
|
"key": "GITEA_CLIENT_ID",
|
|
"default": os.environ.get("GITEA_CLIENT_ID"),
|
|
},
|
|
{
|
|
"key": "GITEA_CLIENT_SECRET",
|
|
"default": os.environ.get("GITEA_CLIENT_SECRET"),
|
|
},
|
|
{
|
|
"key": "GITEA_HOST",
|
|
"default": os.environ.get("GITEA_HOST"),
|
|
},
|
|
]
|
|
)
|
|
|
|
if not (GITEA_CLIENT_ID and GITEA_CLIENT_SECRET and GITEA_HOST):
|
|
raise AuthenticationException(
|
|
error_code=AUTHENTICATION_ERROR_CODES["GITEA_NOT_CONFIGURED"],
|
|
error_message="GITEA_NOT_CONFIGURED",
|
|
)
|
|
|
|
# Enforce scheme and normalize trailing slash(es)
|
|
parsed = urlparse(GITEA_HOST)
|
|
if not parsed.scheme or parsed.scheme not in ("https", "http"):
|
|
raise AuthenticationException(
|
|
error_code=AUTHENTICATION_ERROR_CODES["GITEA_NOT_CONFIGURED"],
|
|
error_message="GITEA_NOT_CONFIGURED", # avoid leaking details to query params
|
|
)
|
|
GITEA_HOST = GITEA_HOST.rstrip("/")
|
|
|
|
# Set URLs based on the host
|
|
self.token_url = f"{GITEA_HOST}/login/oauth/access_token"
|
|
self.userinfo_url = f"{GITEA_HOST}/api/v1/user"
|
|
|
|
client_id = GITEA_CLIENT_ID
|
|
client_secret = GITEA_CLIENT_SECRET
|
|
|
|
redirect_uri = f"{'https' if request.is_secure() else 'http'}://{request.get_host()}/auth/gitea/callback/"
|
|
url_params = {
|
|
"client_id": client_id,
|
|
"scope": self.scope,
|
|
"redirect_uri": redirect_uri,
|
|
"response_type": "code",
|
|
"state": state,
|
|
}
|
|
auth_url = f"{GITEA_HOST}/login/oauth/authorize?{urlencode(url_params)}"
|
|
|
|
super().__init__(
|
|
request,
|
|
self.provider,
|
|
client_id,
|
|
self.scope,
|
|
redirect_uri,
|
|
auth_url,
|
|
self.token_url,
|
|
self.userinfo_url,
|
|
client_secret,
|
|
code,
|
|
callback=callback,
|
|
)
|
|
|
|
def set_token_data(self):
|
|
data = {
|
|
"code": self.code,
|
|
"client_id": self.client_id,
|
|
"client_secret": self.client_secret,
|
|
"redirect_uri": self.redirect_uri,
|
|
"grant_type": "authorization_code",
|
|
}
|
|
headers = {"Accept": "application/json"}
|
|
token_response = self.get_user_token(data=data, headers=headers)
|
|
super().set_token_data(
|
|
{
|
|
"access_token": token_response.get("access_token"),
|
|
"refresh_token": token_response.get("refresh_token", None),
|
|
"access_token_expired_at": (
|
|
datetime.now(tz=pytz.utc) + timedelta(seconds=token_response.get("expires_in"))
|
|
if token_response.get("expires_in")
|
|
else None
|
|
),
|
|
"refresh_token_expired_at": (
|
|
datetime.fromtimestamp(token_response.get("refresh_token_expired_at"), tz=pytz.utc)
|
|
if token_response.get("refresh_token_expired_at")
|
|
else None
|
|
),
|
|
"id_token": token_response.get("id_token", ""),
|
|
}
|
|
)
|
|
|
|
def __get_email(self, headers):
|
|
try:
|
|
# Gitea may not provide email in user response, so fetch it separately
|
|
emails_url = f"{self.userinfo_url}/emails"
|
|
response = requests.get(emails_url, headers=headers)
|
|
if not response.ok:
|
|
raise AuthenticationException(
|
|
error_code=AUTHENTICATION_ERROR_CODES["GITEA_OAUTH_PROVIDER_ERROR"],
|
|
error_message="GITEA_OAUTH_PROVIDER_ERROR: Failed to fetch emails",
|
|
)
|
|
emails_response = response.json()
|
|
|
|
if not emails_response:
|
|
raise AuthenticationException(
|
|
error_code=AUTHENTICATION_ERROR_CODES["GITEA_OAUTH_PROVIDER_ERROR"],
|
|
error_message="GITEA_OAUTH_PROVIDER_ERROR: No emails found",
|
|
)
|
|
# Prefer primary+verified, then any verified, then primary, else first
|
|
email = next((e.get("email") for e in emails_response if e.get("primary") and e.get("verified")), None)
|
|
if not email:
|
|
email = next((e.get("email") for e in emails_response if e.get("verified")), None)
|
|
if not email:
|
|
email = next((e.get("email") for e in emails_response if e.get("primary")), None)
|
|
if not email and emails_response:
|
|
# If no primary email, use the first one
|
|
email = emails_response[0].get("email")
|
|
return email
|
|
except requests.RequestException:
|
|
raise AuthenticationException(
|
|
error_code=AUTHENTICATION_ERROR_CODES["GITEA_OAUTH_PROVIDER_ERROR"],
|
|
error_message="GITEA_OAUTH_PROVIDER_ERROR: Exception occurred while fetching emails",
|
|
)
|
|
|
|
def set_user_data(self):
|
|
user_info_response = self.get_user_response()
|
|
headers = {
|
|
"Authorization": f"Bearer {self.token_data.get('access_token')}",
|
|
"Accept": "application/json",
|
|
}
|
|
|
|
# Get email if not provided in user info
|
|
email = user_info_response.get("email")
|
|
if not email:
|
|
email = self.__get_email(headers=headers)
|
|
|
|
super().set_user_data(
|
|
{
|
|
"email": email,
|
|
"user": {
|
|
"provider_id": str(user_info_response.get("id")),
|
|
"email": email,
|
|
"avatar": user_info_response.get("avatar_url"),
|
|
"first_name": user_info_response.get("full_name") or user_info_response.get("login"),
|
|
"last_name": "", # Gitea doesn't provide separate first/last name
|
|
"is_password_autoset": True,
|
|
},
|
|
}
|
|
)
|