bb-plane-fork/apps/api/plane/app/views/estimate/base.py
sriram veeraghanta 9237f568dd
[WEB-5044] fix: ruff lint and format errors (#7868)
* fix: lint errors

* fix: file formatting

* fix: code refactor
2025-09-29 19:15:32 +05:30

243 lines
10 KiB
Python

import random
import string
import json
# Django imports
from django.utils import timezone
# Third party imports
from rest_framework.response import Response
from rest_framework import status
# Module imports
from ..base import BaseViewSet, BaseAPIView
from plane.app.permissions import ProjectEntityPermission, allow_permission, ROLE
from plane.db.models import Project, Estimate, EstimatePoint, Issue
from plane.app.serializers import (
EstimateSerializer,
EstimatePointSerializer,
EstimateReadSerializer,
)
from plane.utils.cache import invalidate_cache
from plane.bgtasks.issue_activities_task import issue_activity
def generate_random_name(length=10):
letters = string.ascii_lowercase
return "".join(random.choice(letters) for i in range(length))
class ProjectEstimatePointEndpoint(BaseAPIView):
@allow_permission([ROLE.ADMIN, ROLE.MEMBER])
def get(self, request, slug, project_id):
project = Project.objects.get(workspace__slug=slug, pk=project_id)
if project.estimate_id is not None:
estimate_points = EstimatePoint.objects.filter(
estimate_id=project.estimate_id,
project_id=project_id,
workspace__slug=slug,
)
serializer = EstimatePointSerializer(estimate_points, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
return Response([], status=status.HTTP_200_OK)
class BulkEstimatePointEndpoint(BaseViewSet):
permission_classes = [ProjectEntityPermission]
model = Estimate
serializer_class = EstimateSerializer
def list(self, request, slug, project_id):
estimates = (
Estimate.objects.filter(workspace__slug=slug, project_id=project_id)
.prefetch_related("points")
.select_related("workspace", "project")
)
serializer = EstimateReadSerializer(estimates, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
@invalidate_cache(path="/api/workspaces/:slug/estimates/", url_params=True, user=False)
def create(self, request, slug, project_id):
estimate = request.data.get("estimate")
estimate_name = estimate.get("name", generate_random_name())
estimate_type = estimate.get("type", "categories")
last_used = estimate.get("last_used", False)
estimate = Estimate.objects.create(
name=estimate_name,
project_id=project_id,
last_used=last_used,
type=estimate_type,
)
estimate_points = request.data.get("estimate_points", [])
serializer = EstimatePointSerializer(data=request.data.get("estimate_points"), many=True)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
estimate_points = EstimatePoint.objects.bulk_create(
[
EstimatePoint(
estimate=estimate,
key=estimate_point.get("key", 0),
value=estimate_point.get("value", ""),
description=estimate_point.get("description", ""),
project_id=project_id,
workspace_id=estimate.workspace_id,
created_by=request.user,
updated_by=request.user,
)
for estimate_point in estimate_points
],
batch_size=10,
ignore_conflicts=True,
)
serializer = EstimateReadSerializer(estimate)
return Response(serializer.data, status=status.HTTP_200_OK)
def retrieve(self, request, slug, project_id, estimate_id):
estimate = Estimate.objects.get(pk=estimate_id, workspace__slug=slug, project_id=project_id)
serializer = EstimateReadSerializer(estimate)
return Response(serializer.data, status=status.HTTP_200_OK)
@invalidate_cache(path="/api/workspaces/:slug/estimates/", url_params=True, user=False)
def partial_update(self, request, slug, project_id, estimate_id):
if not len(request.data.get("estimate_points", [])):
return Response(
{"error": "Estimate points are required"},
status=status.HTTP_400_BAD_REQUEST,
)
estimate = Estimate.objects.get(pk=estimate_id)
if request.data.get("estimate"):
estimate.name = request.data.get("estimate").get("name", estimate.name)
estimate.type = request.data.get("estimate").get("type", estimate.type)
estimate.save()
estimate_points_data = request.data.get("estimate_points", [])
estimate_points = EstimatePoint.objects.filter(
pk__in=[estimate_point.get("id") for estimate_point in estimate_points_data],
workspace__slug=slug,
project_id=project_id,
estimate_id=estimate_id,
)
updated_estimate_points = []
for estimate_point in estimate_points:
# Find the data for that estimate point
estimate_point_data = [point for point in estimate_points_data if point.get("id") == str(estimate_point.id)]
if len(estimate_point_data):
estimate_point.value = estimate_point_data[0].get("value", estimate_point.value)
estimate_point.key = estimate_point_data[0].get("key", estimate_point.key)
updated_estimate_points.append(estimate_point)
EstimatePoint.objects.bulk_update(updated_estimate_points, ["key", "value"], batch_size=10)
estimate_serializer = EstimateReadSerializer(estimate)
return Response(estimate_serializer.data, status=status.HTTP_200_OK)
@invalidate_cache(path="/api/workspaces/:slug/estimates/", url_params=True, user=False)
def destroy(self, request, slug, project_id, estimate_id):
estimate = Estimate.objects.get(pk=estimate_id, workspace__slug=slug, project_id=project_id)
estimate.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
class EstimatePointEndpoint(BaseViewSet):
@allow_permission([ROLE.ADMIN, ROLE.MEMBER])
def create(self, request, slug, project_id, estimate_id):
# TODO: add a key validation if the same key already exists
if not request.data.get("key") or not request.data.get("value"):
return Response(
{"error": "Key and value are required"},
status=status.HTTP_400_BAD_REQUEST,
)
key = request.data.get("key", 0)
value = request.data.get("value", "")
estimate_point = EstimatePoint.objects.create(
estimate_id=estimate_id, project_id=project_id, key=key, value=value
)
serializer = EstimatePointSerializer(estimate_point).data
return Response(serializer, status=status.HTTP_200_OK)
@allow_permission([ROLE.ADMIN, ROLE.MEMBER])
def partial_update(self, request, slug, project_id, estimate_id, estimate_point_id):
# TODO: add a key validation if the same key already exists
estimate_point = EstimatePoint.objects.get(
pk=estimate_point_id,
estimate_id=estimate_id,
project_id=project_id,
workspace__slug=slug,
)
serializer = EstimatePointSerializer(estimate_point, data=request.data, partial=True)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
serializer.save()
return Response(serializer.data, status=status.HTTP_200_OK)
@allow_permission([ROLE.ADMIN, ROLE.MEMBER])
def destroy(self, request, slug, project_id, estimate_id, estimate_point_id):
new_estimate_id = request.data.get("new_estimate_id", None)
estimate_points = EstimatePoint.objects.filter(
estimate_id=estimate_id, project_id=project_id, workspace__slug=slug
)
# update all the issues with the new estimate
if new_estimate_id:
issues = Issue.objects.filter(
project_id=project_id,
workspace__slug=slug,
estimate_point_id=estimate_point_id,
)
for issue in issues:
issue_activity.delay(
type="issue.activity.updated",
requested_data=json.dumps({"estimate_point": (str(new_estimate_id) if new_estimate_id else None)}),
actor_id=str(request.user.id),
issue_id=issue.id,
project_id=str(project_id),
current_instance=json.dumps(
{"estimate_point": (str(issue.estimate_point_id) if issue.estimate_point_id else None)}
),
epoch=int(timezone.now().timestamp()),
)
issues.update(estimate_point_id=new_estimate_id)
else:
issues = Issue.objects.filter(
project_id=project_id,
workspace__slug=slug,
estimate_point_id=estimate_point_id,
)
for issue in issues:
issue_activity.delay(
type="issue.activity.updated",
requested_data=json.dumps({"estimate_point": None}),
actor_id=str(request.user.id),
issue_id=issue.id,
project_id=str(project_id),
current_instance=json.dumps(
{"estimate_point": (str(issue.estimate_point_id) if issue.estimate_point_id else None)}
),
epoch=int(timezone.now().timestamp()),
)
# delete the estimate point
old_estimate_point = EstimatePoint.objects.filter(pk=estimate_point_id).first()
# rearrange the estimate points
updated_estimate_points = []
for estimate_point in estimate_points:
if estimate_point.key > old_estimate_point.key:
estimate_point.key -= 1
updated_estimate_points.append(estimate_point)
EstimatePoint.objects.bulk_update(updated_estimate_points, ["key"], batch_size=10)
old_estimate_point.delete()
return Response(
EstimatePointSerializer(updated_estimate_points, many=True).data,
status=status.HTTP_200_OK,
)