bb-plane-fork/apiserver/plane/app/views/base.py
Nikhil 1fa47a6c04
[WEB - 549] dev: formatting and removing unused imports (#3782)
* dev: formatting and removing unused imports

* dev: remove unused imports and format all the files

* fix: linting errors

* dev: format using ruff

* dev: remove unused variables
2024-03-06 20:48:30 +05:30

284 lines
8.2 KiB
Python

# Python imports
import zoneinfo
# Django imports
from django.urls import resolve
from django.conf import settings
from django.utils import timezone
from django.db import IntegrityError
from django.core.exceptions import ObjectDoesNotExist, ValidationError
# Third part imports
from rest_framework import status
from rest_framework.viewsets import ModelViewSet
from rest_framework.response import Response
from rest_framework.exceptions import APIException
from rest_framework.views import APIView
from rest_framework.filters import SearchFilter
from rest_framework.permissions import IsAuthenticated
from sentry_sdk import capture_exception
from django_filters.rest_framework import DjangoFilterBackend
# Module imports
from plane.utils.paginator import BasePaginator
from plane.bgtasks.webhook_task import send_webhook
class TimezoneMixin:
"""
This enables timezone conversion according
to the user set timezone
"""
def initial(self, request, *args, **kwargs):
super().initial(request, *args, **kwargs)
if request.user.is_authenticated:
timezone.activate(zoneinfo.ZoneInfo(request.user.user_timezone))
else:
timezone.deactivate()
class WebhookMixin:
webhook_event = None
bulk = False
def finalize_response(self, request, response, *args, **kwargs):
response = super().finalize_response(
request, response, *args, **kwargs
)
# Check for the case should webhook be sent
if (
self.webhook_event
and self.request.method in ["POST", "PATCH", "DELETE"]
and response.status_code in [200, 201, 204]
):
# Push the object to delay
send_webhook.delay(
event=self.webhook_event,
payload=response.data,
kw=self.kwargs,
action=self.request.method,
slug=self.workspace_slug,
bulk=self.bulk,
current_site=request.META.get("HTTP_ORIGIN"),
)
return response
class BaseViewSet(TimezoneMixin, ModelViewSet, BasePaginator):
model = None
permission_classes = [
IsAuthenticated,
]
filter_backends = (
DjangoFilterBackend,
SearchFilter,
)
filterset_fields = []
search_fields = []
def get_queryset(self):
try:
return self.model.objects.all()
except Exception as e:
capture_exception(e)
raise APIException(
"Please check the view", status.HTTP_400_BAD_REQUEST
)
def handle_exception(self, exc):
"""
Handle any exception that occurs, by returning an appropriate response,
or re-raising the error.
"""
try:
response = super().handle_exception(exc)
return response
except Exception as e:
print(e) if settings.DEBUG else print("Server Error")
if isinstance(e, IntegrityError):
return Response(
{"error": "The payload is not valid"},
status=status.HTTP_400_BAD_REQUEST,
)
if isinstance(e, ValidationError):
return Response(
{"error": "Please provide valid detail"},
status=status.HTTP_400_BAD_REQUEST,
)
if isinstance(e, ObjectDoesNotExist):
return Response(
{"error": "The required object does not exist."},
status=status.HTTP_404_NOT_FOUND,
)
if isinstance(e, KeyError):
capture_exception(e)
return Response(
{"error": "The required key does not exist."},
status=status.HTTP_400_BAD_REQUEST,
)
capture_exception(e)
return Response(
{"error": "Something went wrong please try again later"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
def dispatch(self, request, *args, **kwargs):
try:
response = super().dispatch(request, *args, **kwargs)
if settings.DEBUG:
from django.db import connection
print(
f"{request.method} - {request.get_full_path()} of Queries: {len(connection.queries)}"
)
return response
except Exception as exc:
response = self.handle_exception(exc)
return exc
@property
def workspace_slug(self):
return self.kwargs.get("slug", None)
@property
def project_id(self):
project_id = self.kwargs.get("project_id", None)
if project_id:
return project_id
if resolve(self.request.path_info).url_name == "project":
return self.kwargs.get("pk", None)
@property
def fields(self):
fields = [
field
for field in self.request.GET.get("fields", "").split(",")
if field
]
return fields if fields else None
@property
def expand(self):
expand = [
expand
for expand in self.request.GET.get("expand", "").split(",")
if expand
]
return expand if expand else None
class BaseAPIView(TimezoneMixin, APIView, BasePaginator):
permission_classes = [
IsAuthenticated,
]
filter_backends = (
DjangoFilterBackend,
SearchFilter,
)
filterset_fields = []
search_fields = []
def filter_queryset(self, queryset):
for backend in list(self.filter_backends):
queryset = backend().filter_queryset(self.request, queryset, self)
return queryset
def handle_exception(self, exc):
"""
Handle any exception that occurs, by returning an appropriate response,
or re-raising the error.
"""
try:
response = super().handle_exception(exc)
return response
except Exception as e:
if isinstance(e, IntegrityError):
return Response(
{"error": "The payload is not valid"},
status=status.HTTP_400_BAD_REQUEST,
)
if isinstance(e, ValidationError):
return Response(
{"error": "Please provide valid detail"},
status=status.HTTP_400_BAD_REQUEST,
)
if isinstance(e, ObjectDoesNotExist):
return Response(
{"error": "The required object does not exist."},
status=status.HTTP_404_NOT_FOUND,
)
if isinstance(e, KeyError):
return Response(
{"error": "The required key does not exist."},
status=status.HTTP_400_BAD_REQUEST,
)
if settings.DEBUG:
print(e)
capture_exception(e)
return Response(
{"error": "Something went wrong please try again later"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
def dispatch(self, request, *args, **kwargs):
try:
response = super().dispatch(request, *args, **kwargs)
if settings.DEBUG:
from django.db import connection
print(
f"{request.method} - {request.get_full_path()} of Queries: {len(connection.queries)}"
)
return response
except Exception as exc:
response = self.handle_exception(exc)
return exc
@property
def workspace_slug(self):
return self.kwargs.get("slug", None)
@property
def project_id(self):
return self.kwargs.get("project_id", None)
@property
def fields(self):
fields = [
field
for field in self.request.GET.get("fields", "").split(",")
if field
]
return fields if fields else None
@property
def expand(self):
expand = [
expand
for expand in self.request.GET.get("expand", "").split(",")
if expand
]
return expand if expand else None