[WEB-5237] feat: add workspace invitation and project member management endpoints (#8059)

This commit is contained in:
Nikhil 2025-11-04 14:56:21 +05:30 committed by GitHub
parent 96bbbec588
commit 3c6f24de64
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 982 additions and 17 deletions

View file

@ -53,3 +53,5 @@ from .asset import (
GenericAssetUpdateSerializer,
FileAssetSerializer,
)
from .invite import WorkspaceInviteSerializer
from .member import ProjectMemberSerializer

View file

@ -0,0 +1,56 @@
# Django imports
from django.core.exceptions import ValidationError
from django.core.validators import validate_email
from rest_framework import serializers
# Module imports
from plane.db.models import WorkspaceMemberInvite
from .base import BaseSerializer
from plane.app.permissions.base import ROLE
class WorkspaceInviteSerializer(BaseSerializer):
"""
Serializer for workspace invites.
"""
class Meta:
model = WorkspaceMemberInvite
fields = [
"id",
"email",
"role",
"created_at",
"updated_at",
"responded_at",
"accepted",
]
read_only_fields = [
"id",
"workspace",
"created_at",
"updated_at",
"responded_at",
"accepted",
]
def validate_email(self, value):
try:
validate_email(value)
except ValidationError:
raise serializers.ValidationError("Invalid email address", code="INVALID_EMAIL_ADDRESS")
return value
def validate_role(self, value):
if value not in [ROLE.ADMIN.value, ROLE.MEMBER.value, ROLE.GUEST.value]:
raise serializers.ValidationError("Invalid role", code="INVALID_WORKSPACE_MEMBER_ROLE")
return value
def validate(self, data):
slug = self.context["slug"]
if (
data.get("email")
and WorkspaceMemberInvite.objects.filter(email=data["email"], workspace__slug=slug).exists()
):
raise serializers.ValidationError("Email already invited", code="EMAIL_ALREADY_INVITED")
return data

View file

@ -0,0 +1,39 @@
# Third party imports
from rest_framework import serializers
# Module imports
from plane.db.models import ProjectMember, WorkspaceMember
from .base import BaseSerializer
from plane.db.models import User
from plane.utils.permissions import ROLE
class ProjectMemberSerializer(BaseSerializer):
"""
Serializer for project members.
"""
member = serializers.PrimaryKeyRelatedField(
queryset=User.objects.all(),
required=True,
)
def validate_member(self, value):
slug = self.context.get("slug")
if not slug:
raise serializers.ValidationError("Slug is required", code="INVALID_SLUG")
if not value:
raise serializers.ValidationError("Member is required", code="INVALID_MEMBER")
if not WorkspaceMember.objects.filter(workspace__slug=slug, member=value).exists():
raise serializers.ValidationError("Member not found in workspace", code="INVALID_MEMBER")
return value
def validate_role(self, value):
if value not in [ROLE.ADMIN.value, ROLE.MEMBER.value, ROLE.GUEST.value]:
raise serializers.ValidationError("Invalid role", code="INVALID_ROLE")
return value
class Meta:
model = ProjectMember
fields = ["id", "member", "role"]
read_only_fields = ["id"]

View file

@ -8,6 +8,7 @@ from .project import urlpatterns as project_patterns
from .state import urlpatterns as state_patterns
from .user import urlpatterns as user_patterns
from .work_item import urlpatterns as work_item_patterns
from .invite import urlpatterns as invite_patterns
urlpatterns = [
*asset_patterns,
@ -20,4 +21,5 @@ urlpatterns = [
*state_patterns,
*user_patterns,
*work_item_patterns,
*invite_patterns,
]

View file

@ -0,0 +1,18 @@
# Django imports
from django.urls import path, include
# Third party imports
from rest_framework.routers import DefaultRouter
# Module imports
from plane.api.views import WorkspaceInvitationsViewset
# Create router with just the invitations prefix (no workspace slug)
router = DefaultRouter()
router.register(r"invitations", WorkspaceInvitationsViewset, basename="workspace-invitations")
# Wrap the router URLs with the workspace slug path
urlpatterns = [
path("workspaces/<str:slug>/", include(router.urls)),
]

View file

@ -1,13 +1,29 @@
from django.urls import path
from plane.api.views import ProjectMemberAPIEndpoint, WorkspaceMemberAPIEndpoint
from plane.api.views import ProjectMemberListCreateAPIEndpoint, ProjectMemberDetailAPIEndpoint, WorkspaceMemberAPIEndpoint
urlpatterns = [
# Project members
path(
"workspaces/<str:slug>/projects/<uuid:project_id>/members/",
ProjectMemberAPIEndpoint.as_view(http_method_names=["get"]),
ProjectMemberListCreateAPIEndpoint.as_view(http_method_names=["get", "post"]),
name="project-members",
),
path(
"workspaces/<str:slug>/projects/<uuid:project_id>/members/<uuid:pk>/",
ProjectMemberDetailAPIEndpoint.as_view(http_method_names=["patch", "delete", "get"]),
name="project-member",
),
path(
"workspaces/<str:slug>/projects/<uuid:project_id>/project-members/",
ProjectMemberListCreateAPIEndpoint.as_view(http_method_names=["get", "post"]),
name="project-members",
),
path(
"workspaces/<str:slug>/projects/<uuid:project_id>/project-members/<uuid:pk>/",
ProjectMemberDetailAPIEndpoint.as_view(http_method_names=["patch", "delete", "get"]),
name="project-member",
),
path(
"workspaces/<str:slug>/members/",
WorkspaceMemberAPIEndpoint.as_view(http_method_names=["get"]),

View file

@ -43,7 +43,7 @@ from .module import (
ModuleArchiveUnarchiveAPIEndpoint,
)
from .member import ProjectMemberAPIEndpoint, WorkspaceMemberAPIEndpoint
from .member import ProjectMemberListCreateAPIEndpoint, ProjectMemberDetailAPIEndpoint, WorkspaceMemberAPIEndpoint
from .intake import (
IntakeIssueListCreateAPIEndpoint,
@ -53,3 +53,5 @@ from .intake import (
from .asset import UserAssetEndpoint, UserServerAssetEndpoint, GenericAssetEndpoint
from .user import UserEndpoint
from .invite import WorkspaceInvitationsViewset

View file

@ -1,5 +1,6 @@
# Python imports
import zoneinfo
import logging
# Django imports
from django.conf import settings
@ -7,15 +8,19 @@ from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.db import IntegrityError
from django.urls import resolve
from django.utils import timezone
from plane.db.models.api import APIToken
# Third party imports
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
# Third party imports
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.filters import SearchFilter
from rest_framework.viewsets import ModelViewSet
from rest_framework.exceptions import APIException
from rest_framework.generics import GenericAPIView
# Module imports
from plane.db.models.api import APIToken
from plane.api.middleware.api_authentication import APIKeyAuthentication
from plane.api.rate_limit import ApiKeyRateThrottle, ServiceTokenRateThrottle
from plane.utils.exception_logger import log_exception
@ -23,6 +28,9 @@ from plane.utils.paginator import BasePaginator
from plane.utils.core.mixins import ReadReplicaControlMixin
logger = logging.getLogger("plane.api")
class TimezoneMixin:
"""
This enables timezone conversion according
@ -152,3 +160,118 @@ class BaseAPIView(TimezoneMixin, GenericAPIView, ReadReplicaControlMixin, BasePa
def expand(self):
expand = [expand for expand in self.request.GET.get("expand", "").split(",") if expand]
return expand if expand else None
class BaseViewSet(TimezoneMixin, ReadReplicaControlMixin, ModelViewSet, BasePaginator):
model = None
authentication_classes = [APIKeyAuthentication]
permission_classes = [
IsAuthenticated,
]
use_read_replica = False
def get_queryset(self):
try:
return self.model.objects.all()
except Exception as e:
log_exception(e)
raise APIException("Please check the view", status.HTTP_400_BAD_REQUEST)
def handle_exception(self, exc):
"""
Handle any exception that occurs, by returning an appropriate response,
or re-raising the error.
"""
try:
response = super().handle_exception(exc)
return response
except Exception as e:
if isinstance(e, IntegrityError):
log_exception(e)
return Response(
{"error": "The payload is not valid"},
status=status.HTTP_400_BAD_REQUEST,
)
if isinstance(e, ValidationError):
logger.warning(
"Validation Error",
extra={
"error_code": "VALIDATION_ERROR",
"error_message": str(e),
},
)
return Response(
{"error": "Please provide valid detail"},
status=status.HTTP_400_BAD_REQUEST,
)
if isinstance(e, ObjectDoesNotExist):
logger.warning(
"Object Does Not Exist",
extra={
"error_code": "OBJECT_DOES_NOT_EXIST",
"error_message": str(e),
},
)
return Response(
{"error": "The required object does not exist."},
status=status.HTTP_404_NOT_FOUND,
)
if isinstance(e, KeyError):
logger.error(
"Key Error",
extra={
"error_code": "KEY_ERROR",
"error_message": str(e),
},
)
return Response(
{"error": "The required key does not exist."},
status=status.HTTP_400_BAD_REQUEST,
)
log_exception(e)
return Response(
{"error": "Something went wrong please try again later"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
def dispatch(self, request, *args, **kwargs):
try:
response = super().dispatch(request, *args, **kwargs)
if settings.DEBUG:
from django.db import connection
print(f"{request.method} - {request.get_full_path()} of Queries: {len(connection.queries)}")
return response
except Exception as exc:
response = self.handle_exception(exc)
return response
@property
def workspace_slug(self):
return self.kwargs.get("slug", None)
@property
def project_id(self):
project_id = self.kwargs.get("project_id", None)
if project_id:
return project_id
if resolve(self.request.path_info).url_name == "project":
return self.kwargs.get("pk", None)
@property
def fields(self):
fields = [field for field in self.request.GET.get("fields", "").split(",") if field]
return fields if fields else None
@property
def expand(self):
expand = [expand for expand in self.request.GET.get("expand", "").split(",") if expand]
return expand if expand else None

View file

@ -0,0 +1,150 @@
# Third party imports
from rest_framework.response import Response
from rest_framework import status
from drf_spectacular.utils import (
extend_schema,
OpenApiResponse,
OpenApiRequest,
OpenApiParameter,
OpenApiTypes,
)
# Module imports
from plane.api.views.base import BaseViewSet
from plane.db.models import WorkspaceMemberInvite, Workspace
from plane.api.serializers import WorkspaceInviteSerializer
from plane.utils.permissions import WorkspaceOwnerPermission
from plane.utils.openapi.parameters import WORKSPACE_SLUG_PARAMETER
class WorkspaceInvitationsViewset(BaseViewSet):
"""
Endpoint for creating, listing and deleting workspace invites.
"""
serializer_class = WorkspaceInviteSerializer
model = WorkspaceMemberInvite
permission_classes = [
WorkspaceOwnerPermission,
]
def get_queryset(self):
return self.filter_queryset(super().get_queryset().filter(workspace__slug=self.kwargs.get("slug")))
def get_object(self):
return self.get_queryset().get(pk=self.kwargs.get("pk"))
@extend_schema(
summary="List workspace invites",
description="List all workspace invites for a workspace",
responses={
200: OpenApiResponse(
description="Workspace invites",
response=WorkspaceInviteSerializer(many=True),
)
},
parameters=[
WORKSPACE_SLUG_PARAMETER,
],
)
def list(self, request, slug):
workspace_member_invites = self.get_queryset()
serializer = WorkspaceInviteSerializer(workspace_member_invites, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
@extend_schema(
summary="Get workspace invite",
description="Get a workspace invite by ID",
responses={200: OpenApiResponse(description="Workspace invite", response=WorkspaceInviteSerializer)},
parameters=[
WORKSPACE_SLUG_PARAMETER,
OpenApiParameter(
name="pk",
description="Workspace invite ID",
required=True,
type=OpenApiTypes.UUID,
location=OpenApiParameter.PATH,
),
],
)
def retrieve(self, request, slug, pk):
workspace_member_invite = self.get_object()
serializer = WorkspaceInviteSerializer(workspace_member_invite)
return Response(serializer.data, status=status.HTTP_200_OK)
@extend_schema(
summary="Create workspace invite",
description="Create a workspace invite",
responses={201: OpenApiResponse(description="Workspace invite", response=WorkspaceInviteSerializer)},
request=OpenApiRequest(request=WorkspaceInviteSerializer),
parameters=[
WORKSPACE_SLUG_PARAMETER,
],
)
def create(self, request, slug):
workspace = Workspace.objects.get(slug=slug)
serializer = WorkspaceInviteSerializer(data=request.data, context={"slug": slug})
serializer.is_valid(raise_exception=True)
serializer.save(workspace=workspace, created_by=request.user)
return Response(serializer.data, status=status.HTTP_201_CREATED)
@extend_schema(
summary="Update workspace invite",
description="Update a workspace invite",
responses={200: OpenApiResponse(description="Workspace invite", response=WorkspaceInviteSerializer)},
request=OpenApiRequest(request=WorkspaceInviteSerializer),
parameters=[
WORKSPACE_SLUG_PARAMETER,
OpenApiParameter(
name="pk",
description="Workspace invite ID",
required=True,
type=OpenApiTypes.UUID,
location=OpenApiParameter.PATH,
),
],
)
def partial_update(self, request, slug, pk):
workspace_member_invite = self.get_object()
if request.data.get("email"):
return Response(
status=status.HTTP_400_BAD_REQUEST,
data={"error": "Email cannot be updated after invite is created.", "code": "EMAIL_CANNOT_BE_UPDATED"},
)
serializer = WorkspaceInviteSerializer(
workspace_member_invite, data=request.data, partial=True, context={"slug": slug}
)
serializer.is_valid(raise_exception=True)
serializer.save()
return Response(serializer.data, status=status.HTTP_200_OK)
@extend_schema(
summary="Delete workspace invite",
description="Delete a workspace invite",
responses={204: OpenApiResponse(description="Workspace invite deleted")},
parameters=[
WORKSPACE_SLUG_PARAMETER,
OpenApiParameter(
name="pk",
description="Workspace invite ID",
required=True,
type=OpenApiTypes.UUID,
location=OpenApiParameter.PATH,
),
],
)
def destroy(self, request, slug, pk):
workspace_member_invite = self.get_object()
if workspace_member_invite.accepted:
return Response(
status=status.HTTP_400_BAD_REQUEST,
data={"error": "Invite already accepted", "code": "INVITE_ALREADY_ACCEPTED"},
)
if workspace_member_invite.responded_at:
return Response(
status=status.HTTP_400_BAD_REQUEST,
data={"error": "Invite already responded", "code": "INVITE_ALREADY_RESPONDED"},
)
workspace_member_invite.delete()
return Response(status=status.HTTP_204_NO_CONTENT)

View file

@ -1808,7 +1808,7 @@ class IssueAttachmentListCreateAPIEndpoint(BaseAPIView):
request.user.id,
project_id=project_id,
issue=issue,
allowed_roles=[ROLE.ADMIN.value, ROLE.MEMBER.value],
allowed_roles=[ROLE.ADMIN.value, ROLE.MEMBER.value, ROLE.GUEST.value],
allow_creator=True,
):
return Response(
@ -1961,10 +1961,10 @@ class IssueAttachmentDetailAPIEndpoint(BaseAPIView):
issue = Issue.objects.get(pk=issue_id, workspace__slug=slug, project_id=project_id)
# if the request user is creator or admin then delete the attachment
if not user_has_issue_permission(
request.user,
request.user.id,
project_id=project_id,
issue=issue,
allowed_roles=[ROLE.ADMIN.value],
allowed_roles=[ROLE.ADMIN.value, ROLE.MEMBER.value, ROLE.GUEST.value],
allow_creator=True,
):
return Response(
@ -2034,7 +2034,7 @@ class IssueAttachmentDetailAPIEndpoint(BaseAPIView):
"""
# if the user is part of the project then allow the download
if not user_has_issue_permission(
request.user,
request.user.id,
project_id=project_id,
issue=None,
allowed_roles=None,
@ -2099,10 +2099,10 @@ class IssueAttachmentDetailAPIEndpoint(BaseAPIView):
issue = Issue.objects.get(pk=issue_id, workspace__slug=slug, project_id=project_id)
# if the user is creator or admin then allow the upload
if not user_has_issue_permission(
request.user,
request.user.id,
project_id=project_id,
issue=issue,
allowed_roles=[ROLE.ADMIN.value, ROLE.MEMBER.value],
allowed_roles=[ROLE.ADMIN.value, ROLE.MEMBER.value, ROLE.GUEST.value],
allow_creator=True,
):
return Response(

View file

@ -4,13 +4,14 @@ from rest_framework import status
from drf_spectacular.utils import (
extend_schema,
OpenApiResponse,
OpenApiRequest,
)
# Module imports
from .base import BaseAPIView
from plane.api.serializers import UserLiteSerializer
from plane.api.serializers import UserLiteSerializer, ProjectMemberSerializer
from plane.db.models import User, Workspace, WorkspaceMember, ProjectMember
from plane.app.permissions import ProjectMemberPermission, WorkSpaceAdminPermission
from plane.utils.permissions import ProjectMemberPermission, WorkSpaceAdminPermission, ProjectAdminPermission
from plane.utils.openapi import (
WORKSPACE_SLUG_PARAMETER,
PROJECT_ID_PARAMETER,
@ -86,11 +87,15 @@ class WorkspaceMemberAPIEndpoint(BaseAPIView):
return Response(users_with_roles, status=status.HTTP_200_OK)
# API endpoint to get and insert users inside the workspace
class ProjectMemberAPIEndpoint(BaseAPIView):
class ProjectMemberListCreateAPIEndpoint(BaseAPIView):
permission_classes = [ProjectMemberPermission]
use_read_replica = True
def get_permissions(self):
if self.request.method == "GET":
return [ProjectMemberPermission()]
return [ProjectAdminPermission()]
@extend_schema(
operation_id="get_project_members",
summary="List project members",
@ -129,5 +134,86 @@ class ProjectMemberAPIEndpoint(BaseAPIView):
# Get all the users that are present inside the workspace
users = UserLiteSerializer(User.objects.filter(id__in=project_members), many=True).data
return Response(users, status=status.HTTP_200_OK)
@extend_schema(
operation_id="create_project_member",
summary="Create project member",
description="Create a new project member",
tags=["Members"],
parameters=[WORKSPACE_SLUG_PARAMETER, PROJECT_ID_PARAMETER],
responses={201: OpenApiResponse(description="Project member created", response=ProjectMemberSerializer)},
request=OpenApiRequest(request=ProjectMemberSerializer),
)
def post(self, request, slug, project_id):
serializer = ProjectMemberSerializer(data=request.data, context={"slug": slug})
serializer.is_valid(raise_exception=True)
serializer.save(project_id=project_id)
return Response(serializer.data, status=status.HTTP_201_CREATED)
# API endpoint to get and update a project member
class ProjectMemberDetailAPIEndpoint(ProjectMemberListCreateAPIEndpoint):
@extend_schema(
operation_id="get_project_member",
summary="Get project member",
description="Retrieve a project member by ID.",
tags=["Members"],
parameters=[WORKSPACE_SLUG_PARAMETER, PROJECT_ID_PARAMETER],
responses={
200: OpenApiResponse(description="Project member", response=ProjectMemberSerializer),
401: UNAUTHORIZED_RESPONSE,
403: FORBIDDEN_RESPONSE,
404: PROJECT_NOT_FOUND_RESPONSE,
},
)
# Get a project member by ID
def get(self, request, slug, project_id, pk):
"""Get project member
Retrieve a project member by ID.
Returns a project member with their project-specific roles and access levels.
"""
# Check if the workspace exists
if not Workspace.objects.filter(slug=slug).exists():
return Response(
{"error": "Provided workspace does not exist"},
status=status.HTTP_400_BAD_REQUEST,
)
# Get the workspace members that are present inside the workspace
project_members = ProjectMember.objects.get(project_id=project_id, workspace__slug=slug, pk=pk)
user = User.objects.get(id=project_members.member_id)
user = UserLiteSerializer(user).data
return Response(user, status=status.HTTP_200_OK)
@extend_schema(
operation_id="update_project_member",
summary="Update project member",
description="Update a project member",
tags=["Members"],
parameters=[WORKSPACE_SLUG_PARAMETER, PROJECT_ID_PARAMETER],
responses={200: OpenApiResponse(description="Project member updated", response=ProjectMemberSerializer)},
request=OpenApiRequest(request=ProjectMemberSerializer),
)
def patch(self, request, slug, project_id, pk):
project_member = ProjectMember.objects.get(project_id=project_id, workspace__slug=slug, pk=pk)
serializer = ProjectMemberSerializer(project_member, data=request.data, partial=True, context={"slug": slug})
serializer.is_valid(raise_exception=True)
serializer.save()
return Response(serializer.data, status=status.HTTP_200_OK)
@extend_schema(
operation_id="delete_project_member",
summary="Delete project member",
description="Delete a project member",
tags=["Members"],
parameters=[WORKSPACE_SLUG_PARAMETER, PROJECT_ID_PARAMETER],
responses={204: OpenApiResponse(description="Project member deleted")},
)
def delete(self, request, slug, project_id, pk):
project_member = ProjectMember.objects.get(project_id=project_id, workspace__slug=slug, pk=pk)
project_member.is_active = False
project_member.save()
return Response(status=status.HTTP_204_NO_CONTENT)

View file

@ -11,6 +11,7 @@ from .project import (
ProjectEntityPermission,
ProjectMemberPermission,
ProjectLitePermission,
ProjectAdminPermission,
)
from .base import allow_permission, ROLE
from .page import ProjectPagePermission

View file

@ -112,6 +112,20 @@ class ProjectEntityPermission(BasePermission):
).exists()
class ProjectAdminPermission(BasePermission):
def has_permission(self, request, view):
if request.user.is_anonymous:
return False
return ProjectMember.objects.filter(
workspace__slug=view.workspace_slug,
member=request.user,
role=ROLE.ADMIN.value,
project_id=view.project_id,
is_active=True,
).exists()
class ProjectLitePermission(BasePermission):
def has_permission(self, request, view):
if request.user.is_anonymous:

View file

@ -0,0 +1,17 @@
from .workspace import (
WorkSpaceBasePermission,
WorkspaceOwnerPermission,
WorkSpaceAdminPermission,
WorkspaceEntityPermission,
WorkspaceViewerPermission,
WorkspaceUserPermission,
)
from .project import (
ProjectBasePermission,
ProjectEntityPermission,
ProjectMemberPermission,
ProjectLitePermission,
ProjectAdminPermission,
)
from .base import allow_permission, ROLE
from .page import ProjectPagePermission

View file

@ -0,0 +1,73 @@
from plane.db.models import WorkspaceMember, ProjectMember
from functools import wraps
from rest_framework.response import Response
from rest_framework import status
from enum import Enum
class ROLE(Enum):
ADMIN = 20
MEMBER = 15
GUEST = 5
def allow_permission(allowed_roles, level="PROJECT", creator=False, model=None):
def decorator(view_func):
@wraps(view_func)
def _wrapped_view(instance, request, *args, **kwargs):
# Check for creator if required
if creator and model:
obj = model.objects.filter(id=kwargs["pk"], created_by=request.user).exists()
if obj:
return view_func(instance, request, *args, **kwargs)
# Convert allowed_roles to their values if they are enum members
allowed_role_values = [role.value if isinstance(role, ROLE) else role for role in allowed_roles]
# Check role permissions
if level == "WORKSPACE":
if WorkspaceMember.objects.filter(
member=request.user,
workspace__slug=kwargs["slug"],
role__in=allowed_role_values,
is_active=True,
).exists():
return view_func(instance, request, *args, **kwargs)
else:
is_user_has_allowed_role = ProjectMember.objects.filter(
member=request.user,
workspace__slug=kwargs["slug"],
project_id=kwargs["project_id"],
role__in=allowed_role_values,
is_active=True,
).exists()
# Return if the user has the allowed role else if they are workspace admin and part of the project regardless of the role # noqa: E501
if is_user_has_allowed_role:
return view_func(instance, request, *args, **kwargs)
elif (
ProjectMember.objects.filter(
member=request.user,
workspace__slug=kwargs["slug"],
project_id=kwargs["project_id"],
is_active=True,
).exists()
and WorkspaceMember.objects.filter(
member=request.user,
workspace__slug=kwargs["slug"],
role=ROLE.ADMIN.value,
is_active=True,
).exists()
):
return view_func(instance, request, *args, **kwargs)
# Return permission denied if no conditions are met
return Response(
{"error": "You don't have the required permissions."},
status=status.HTTP_403_FORBIDDEN,
)
return _wrapped_view
return decorator

View file

@ -0,0 +1,121 @@
from plane.db.models import ProjectMember, Page
from plane.app.permissions import ROLE
from rest_framework.permissions import BasePermission, SAFE_METHODS
# Permission Mappings for workspace members
ADMIN = ROLE.ADMIN.value
MEMBER = ROLE.MEMBER.value
GUEST = ROLE.GUEST.value
class ProjectPagePermission(BasePermission):
"""
Custom permission to control access to pages within a workspace
based on user roles, page visibility (public/private), and feature flags.
"""
def has_permission(self, request, view):
"""
Check basic project-level permissions before checking object-level permissions.
"""
if request.user.is_anonymous:
return False
user_id = request.user.id
slug = view.kwargs.get("slug")
page_id = view.kwargs.get("page_id")
project_id = view.kwargs.get("project_id")
# Hook for extended validation
extended_access, role = self._check_access_and_get_role(request, slug, project_id)
if extended_access is False:
return False
if page_id:
page = Page.objects.get(id=page_id, workspace__slug=slug)
# Allow access if the user is the owner of the page
if page.owned_by_id == user_id:
return True
# Handle private page access
if page.access == Page.PRIVATE_ACCESS:
return self._has_private_page_action_access(request, slug, page, project_id)
# Handle public page access
return self._has_public_page_action_access(request, role)
def _check_project_member_access(self, request, slug, project_id):
"""
Check if the user is a project member.
"""
return (
ProjectMember.objects.filter(
member=request.user,
workspace__slug=slug,
is_active=True,
project_id=project_id,
)
.values_list("role", flat=True)
.first()
)
def _check_access_and_get_role(self, request, slug, project_id):
"""
Hook for extended access checking
Returns: True (allow), False (deny), None (continue with normal flow)
"""
role = self._check_project_member_access(request, slug, project_id)
if not role:
return False, None
return True, role
def _has_private_page_action_access(self, request, slug, page, project_id):
"""
Check access to private pages. Override for feature flag logic.
"""
# Base implementation: only owner can access private pages
return False
def _check_project_action_access(self, request, role):
method = request.method
# Only admins can create (POST) pages
if method == "POST":
if role in [ADMIN, MEMBER]:
return True
return False
# Safe methods (GET, HEAD, OPTIONS) allowed for all active roles
if method in SAFE_METHODS:
if role in [ADMIN, MEMBER, GUEST]:
return True
return False
# PUT/PATCH: Admins and members can update
if method in ["PUT", "PATCH"]:
if role in [ADMIN, MEMBER]:
return True
return False
# DELETE: Only admins can delete
if method == "DELETE":
if role in [ADMIN]:
return True
return False
# Deny by default
return False
def _has_public_page_action_access(self, request, role):
"""
Check if the user has permission to access a public page
and can perform operations on the page.
"""
project_member_exists = self._check_project_action_access(request, role)
if not project_member_exists:
return False
return True

View file

@ -0,0 +1,139 @@
# Third Party imports
from rest_framework.permissions import SAFE_METHODS, BasePermission
# Module import
from plane.db.models import ProjectMember, WorkspaceMember
from plane.db.models.project import ROLE
class ProjectBasePermission(BasePermission):
def has_permission(self, request, view):
if request.user.is_anonymous:
return False
## Safe Methods -> Handle the filtering logic in queryset
if request.method in SAFE_METHODS:
return WorkspaceMember.objects.filter(
workspace__slug=view.workspace_slug, member=request.user, is_active=True
).exists()
## Only workspace owners or admins can create the projects
if request.method == "POST":
return WorkspaceMember.objects.filter(
workspace__slug=view.workspace_slug,
member=request.user,
role__in=[ROLE.ADMIN.value, ROLE.MEMBER.value],
is_active=True,
).exists()
project_member_qs = ProjectMember.objects.filter(
workspace__slug=view.workspace_slug,
member=request.user,
project_id=view.project_id,
is_active=True,
)
## Only project admins or workspace admin who is part of the project can access
if project_member_qs.filter(role=ROLE.ADMIN.value).exists():
return True
else:
return (
project_member_qs.exists()
and WorkspaceMember.objects.filter(
member=request.user,
workspace__slug=view.workspace_slug,
role=ROLE.ADMIN.value,
is_active=True,
).exists()
)
class ProjectMemberPermission(BasePermission):
def has_permission(self, request, view):
if request.user.is_anonymous:
return False
## Safe Methods -> Handle the filtering logic in queryset
if request.method in SAFE_METHODS:
return ProjectMember.objects.filter(
workspace__slug=view.workspace_slug, member=request.user, is_active=True
).exists()
## Only workspace owners or admins can create the projects
if request.method == "POST":
return WorkspaceMember.objects.filter(
workspace__slug=view.workspace_slug,
member=request.user,
role__in=[ROLE.ADMIN.value, ROLE.MEMBER.value],
is_active=True,
).exists()
## Only Project Admins can update project attributes
return ProjectMember.objects.filter(
workspace__slug=view.workspace_slug,
member=request.user,
role__in=[ROLE.ADMIN.value, ROLE.MEMBER.value],
project_id=view.project_id,
is_active=True,
).exists()
class ProjectEntityPermission(BasePermission):
def has_permission(self, request, view):
if request.user.is_anonymous:
return False
# Handle requests based on project__identifier
if hasattr(view, "project_identifier") and view.project_identifier:
if request.method in SAFE_METHODS:
return ProjectMember.objects.filter(
workspace__slug=view.workspace_slug,
member=request.user,
project__identifier=view.project_identifier,
is_active=True,
).exists()
## Safe Methods -> Handle the filtering logic in queryset
if request.method in SAFE_METHODS:
return ProjectMember.objects.filter(
workspace__slug=view.workspace_slug,
member=request.user,
project_id=view.project_id,
is_active=True,
).exists()
## Only project members or admins can create and edit the project attributes
return ProjectMember.objects.filter(
workspace__slug=view.workspace_slug,
member=request.user,
role__in=[ROLE.ADMIN.value, ROLE.MEMBER.value],
project_id=view.project_id,
is_active=True,
).exists()
class ProjectAdminPermission(BasePermission):
def has_permission(self, request, view):
if request.user.is_anonymous:
return False
return ProjectMember.objects.filter(
workspace__slug=view.workspace_slug,
member=request.user,
role=ROLE.ADMIN.value,
project_id=view.project_id,
is_active=True,
).exists()
class ProjectLitePermission(BasePermission):
def has_permission(self, request, view):
if request.user.is_anonymous:
return False
return ProjectMember.objects.filter(
workspace__slug=view.workspace_slug,
member=request.user,
project_id=view.project_id,
is_active=True,
).exists()

View file

@ -0,0 +1,106 @@
# Third Party imports
from rest_framework.permissions import BasePermission, SAFE_METHODS
# Module imports
from plane.db.models import WorkspaceMember
# Permission Mappings
Admin = 20
Member = 15
Guest = 5
# TODO: Move the below logic to python match - python v3.10
class WorkSpaceBasePermission(BasePermission):
def has_permission(self, request, view):
# allow anyone to create a workspace
if request.user.is_anonymous:
return False
if request.method == "POST":
return True
## Safe Methods
if request.method in SAFE_METHODS:
return True
# allow only admins and owners to update the workspace settings
if request.method in ["PUT", "PATCH"]:
return WorkspaceMember.objects.filter(
member=request.user,
workspace__slug=view.workspace_slug,
role__in=[Admin, Member],
is_active=True,
).exists()
# allow only owner to delete the workspace
if request.method == "DELETE":
return WorkspaceMember.objects.filter(
member=request.user,
workspace__slug=view.workspace_slug,
role=Admin,
is_active=True,
).exists()
class WorkspaceOwnerPermission(BasePermission):
def has_permission(self, request, view):
if request.user.is_anonymous:
return False
return WorkspaceMember.objects.filter(
workspace__slug=view.workspace_slug, member=request.user, role=Admin
).exists()
class WorkSpaceAdminPermission(BasePermission):
def has_permission(self, request, view):
if request.user.is_anonymous:
return False
return WorkspaceMember.objects.filter(
member=request.user,
workspace__slug=view.workspace_slug,
role__in=[Admin, Member],
is_active=True,
).exists()
class WorkspaceEntityPermission(BasePermission):
def has_permission(self, request, view):
if request.user.is_anonymous:
return False
## Safe Methods -> Handle the filtering logic in queryset
if request.method in SAFE_METHODS:
return WorkspaceMember.objects.filter(
workspace__slug=view.workspace_slug, member=request.user, is_active=True
).exists()
return WorkspaceMember.objects.filter(
member=request.user,
workspace__slug=view.workspace_slug,
role__in=[Admin, Member],
is_active=True,
).exists()
class WorkspaceViewerPermission(BasePermission):
def has_permission(self, request, view):
if request.user.is_anonymous:
return False
return WorkspaceMember.objects.filter(
member=request.user, workspace__slug=view.workspace_slug, is_active=True
).exists()
class WorkspaceUserPermission(BasePermission):
def has_permission(self, request, view):
if request.user.is_anonymous:
return False
return WorkspaceMember.objects.filter(
member=request.user, workspace__slug=view.workspace_slug, is_active=True
).exists()