chore: rename server to api (#7342)

This commit is contained in:
sriram veeraghanta 2025-07-04 15:32:21 +05:30 committed by GitHub
parent 6bee97eb26
commit fdbe4c2ca6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
554 changed files with 39 additions and 43 deletions

View file

View file

@ -0,0 +1,248 @@
# Python imports
from datetime import timedelta
from itertools import groupby
# Django import
from django.db import models
from django.db.models import Case, CharField, Count, F, Sum, Value, When, FloatField
from django.db.models.functions import (
Coalesce,
Concat,
ExtractMonth,
ExtractYear,
TruncDate,
Cast,
)
from django.utils import timezone
# Module imports
from plane.db.models import Issue, Project
def annotate_with_monthly_dimension(queryset, field_name, attribute):
# Get the year and the months
year = ExtractYear(field_name)
month = ExtractMonth(field_name)
# Concat the year and month
dimension = Concat(year, Value("-"), month, output_field=CharField())
# Annotate the dimension
return queryset.annotate(**{attribute: dimension})
def extract_axis(queryset, x_axis):
# Format the dimension when the axis is in date
if x_axis in ["created_at", "start_date", "target_date", "completed_at"]:
queryset = annotate_with_monthly_dimension(queryset, x_axis, "dimension")
return queryset, "dimension"
else:
return queryset.annotate(dimension=F(x_axis)), "dimension"
def sort_data(data, temp_axis):
# When the axis is in priority order by
if temp_axis == "priority":
order = ["low", "medium", "high", "urgent", "none"]
return {key: data[key] for key in order if key in data}
else:
return dict(sorted(data.items(), key=lambda x: (x[0] == "none", x[0])))
def build_graph_plot(queryset, x_axis, y_axis, segment=None):
# temp x_axis
temp_axis = x_axis
# Extract the x_axis and queryset
queryset, x_axis = extract_axis(queryset, x_axis)
if x_axis == "dimension":
queryset = queryset.exclude(dimension__isnull=True)
#
if segment in ["created_at", "start_date", "target_date", "completed_at"]:
queryset = annotate_with_monthly_dimension(queryset, segment, "segmented")
segment = "segmented"
queryset = queryset.values(x_axis)
# Issue count
if y_axis == "issue_count":
queryset = queryset.annotate(
is_null=Case(
When(dimension__isnull=True, then=Value("None")),
default=Value("not_null"),
output_field=models.CharField(max_length=8),
),
dimension_ex=Coalesce("dimension", Value("null")),
).values("dimension")
queryset = queryset.annotate(segment=F(segment)) if segment else queryset
queryset = (
queryset.values("dimension", "segment")
if segment
else queryset.values("dimension")
)
queryset = queryset.annotate(count=Count("*")).order_by("dimension")
# Estimate
else:
queryset = queryset.annotate(
estimate=Sum(Cast("estimate_point__value", FloatField()))
).order_by(x_axis)
queryset = queryset.annotate(segment=F(segment)) if segment else queryset
queryset = (
queryset.values("dimension", "segment", "estimate")
if segment
else queryset.values("dimension", "estimate")
)
result_values = list(queryset)
grouped_data = {
str(key): list(items)
for key, items in groupby(result_values, key=lambda x: x[str("dimension")])
}
return sort_data(grouped_data, temp_axis)
def burndown_plot(queryset, slug, project_id, plot_type, cycle_id=None, module_id=None):
# Total Issues in Cycle or Module
total_issues = queryset.total_issues
# check whether the estimate is a point or not
estimate_type = Project.objects.filter(
workspace__slug=slug,
pk=project_id,
estimate__isnull=False,
estimate__type="points",
).exists()
if estimate_type and plot_type == "points" and cycle_id:
issue_estimates = Issue.issue_objects.filter(
workspace__slug=slug,
project_id=project_id,
issue_cycle__cycle_id=cycle_id,
issue_cycle__deleted_at__isnull=True,
estimate_point__isnull=False,
).values_list("estimate_point__value", flat=True)
issue_estimates = [float(value) for value in issue_estimates]
total_estimate_points = sum(issue_estimates)
if estimate_type and plot_type == "points" and module_id:
issue_estimates = Issue.issue_objects.filter(
workspace__slug=slug,
project_id=project_id,
issue_module__module_id=module_id,
issue_module__deleted_at__isnull=True,
estimate_point__isnull=False,
).values_list("estimate_point__value", flat=True)
issue_estimates = [float(value) for value in issue_estimates]
total_estimate_points = sum(issue_estimates)
if cycle_id:
if queryset.end_date and queryset.start_date:
# Get all dates between the two dates
date_range = [
(queryset.start_date + timedelta(days=x)).date()
for x in range(
(queryset.end_date.date() - queryset.start_date.date()).days + 1
)
]
else:
date_range = []
chart_data = {str(date): 0 for date in date_range}
if plot_type == "points":
completed_issues_estimate_point_distribution = (
Issue.issue_objects.filter(
workspace__slug=slug,
project_id=project_id,
issue_cycle__cycle_id=cycle_id,
issue_cycle__deleted_at__isnull=True,
estimate_point__isnull=False,
)
.annotate(date=TruncDate("completed_at"))
.values("date")
.values("date", "estimate_point__value")
.order_by("date")
)
else:
completed_issues_distribution = (
Issue.issue_objects.filter(
workspace__slug=slug,
project_id=project_id,
issue_cycle__cycle_id=cycle_id,
issue_cycle__deleted_at__isnull=True,
)
.annotate(date=TruncDate("completed_at"))
.values("date")
.annotate(total_completed=Count("id"))
.values("date", "total_completed")
.order_by("date")
)
if module_id:
# Get all dates between the two dates
date_range = [
(queryset.start_date + timedelta(days=x))
for x in range((queryset.target_date - queryset.start_date).days + 1)
]
chart_data = {str(date): 0 for date in date_range}
if plot_type == "points":
completed_issues_estimate_point_distribution = (
Issue.issue_objects.filter(
workspace__slug=slug,
project_id=project_id,
issue_module__module_id=module_id,
issue_module__deleted_at__isnull=True,
estimate_point__isnull=False,
)
.annotate(date=TruncDate("completed_at"))
.values("date")
.values("date", "estimate_point__value")
.order_by("date")
)
else:
completed_issues_distribution = (
Issue.issue_objects.filter(
workspace__slug=slug,
project_id=project_id,
issue_module__module_id=module_id,
issue_module__deleted_at__isnull=True,
)
.annotate(date=TruncDate("completed_at"))
.values("date")
.annotate(total_completed=Count("id"))
.values("date", "total_completed")
.order_by("date")
)
if plot_type == "points":
for date in date_range:
cumulative_pending_issues = total_estimate_points
total_completed = 0
total_completed = sum(
float(item["estimate_point__value"])
for item in completed_issues_estimate_point_distribution
if item["date"] is not None and item["date"] <= date
)
cumulative_pending_issues -= total_completed
if date > timezone.now().date():
chart_data[str(date)] = None
else:
chart_data[str(date)] = cumulative_pending_issues
else:
for date in date_range:
cumulative_pending_issues = total_issues
total_completed = 0
total_completed = sum(
item["total_completed"]
for item in completed_issues_distribution
if item["date"] is not None and item["date"] <= date
)
cumulative_pending_issues -= total_completed
if date > timezone.now().date():
chart_data[str(date)] = None
else:
chart_data[str(date)] = cumulative_pending_issues
return chart_data

View file

@ -0,0 +1,204 @@
from typing import Dict, Any, Tuple, Optional, List, Union
# Django imports
from django.db.models import (
Count,
F,
QuerySet,
Aggregate,
)
from plane.db.models import Issue
from rest_framework.exceptions import ValidationError
x_axis_mapper = {
"STATES": "STATES",
"STATE_GROUPS": "STATE_GROUPS",
"LABELS": "LABELS",
"ASSIGNEES": "ASSIGNEES",
"ESTIMATE_POINTS": "ESTIMATE_POINTS",
"CYCLES": "CYCLES",
"MODULES": "MODULES",
"PRIORITY": "PRIORITY",
"START_DATE": "START_DATE",
"TARGET_DATE": "TARGET_DATE",
"CREATED_AT": "CREATED_AT",
"COMPLETED_AT": "COMPLETED_AT",
"CREATED_BY": "CREATED_BY",
}
def get_y_axis_filter(y_axis: str) -> Dict[str, Any]:
filter_mapping = {
"WORK_ITEM_COUNT": {"id": F("id")},
}
return filter_mapping.get(y_axis, {})
def get_x_axis_field() -> Dict[str, Tuple[str, str, Optional[Dict[str, Any]]]]:
return {
"STATES": ("state__id", "state__name", None),
"STATE_GROUPS": ("state__group", "state__group", None),
"LABELS": (
"labels__id",
"labels__name",
{"label_issue__deleted_at__isnull": True},
),
"ASSIGNEES": (
"assignees__id",
"assignees__display_name",
{"issue_assignee__deleted_at__isnull": True},
),
"ESTIMATE_POINTS": ("estimate_point__value", "estimate_point__key", None),
"CYCLES": (
"issue_cycle__cycle_id",
"issue_cycle__cycle__name",
{"issue_cycle__deleted_at__isnull": True},
),
"MODULES": (
"issue_module__module_id",
"issue_module__module__name",
{"issue_module__deleted_at__isnull": True},
),
"PRIORITY": ("priority", "priority", None),
"START_DATE": ("start_date", "start_date", None),
"TARGET_DATE": ("target_date", "target_date", None),
"CREATED_AT": ("created_at__date", "created_at__date", None),
"COMPLETED_AT": ("completed_at__date", "completed_at__date", None),
"CREATED_BY": ("created_by_id", "created_by__display_name", None),
}
def process_grouped_data(
data: List[Dict[str, Any]],
) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
response = {}
schema = {}
for item in data:
key = item["key"]
if key not in response:
response[key] = {
"key": key if key else "none",
"name": (
item.get("display_name", key)
if item.get("display_name", key)
else "None"
),
"count": 0,
}
group_key = str(item["group_key"]) if item["group_key"] else "none"
schema[group_key] = item.get("group_name", item["group_key"])
schema[group_key] = schema[group_key] if schema[group_key] else "None"
response[key][group_key] = response[key].get(group_key, 0) + item["count"]
response[key]["count"] += item["count"]
return list(response.values()), schema
def build_number_chart_response(
queryset: QuerySet[Issue],
y_axis_filter: Dict[str, Any],
y_axis: str,
aggregate_func: Aggregate,
) -> List[Dict[str, Any]]:
count = (
queryset.filter(**y_axis_filter).aggregate(total=aggregate_func).get("total", 0)
)
return [{"key": y_axis, "name": y_axis, "count": count}]
def build_grouped_chart_response(
queryset: QuerySet[Issue],
id_field: str,
name_field: str,
group_field: str,
group_name_field: str,
aggregate_func: Aggregate,
) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
data = (
queryset.annotate(
key=F(id_field),
group_key=F(group_field),
group_name=F(group_name_field),
display_name=F(name_field) if name_field else F(id_field),
)
.values("key", "group_key", "group_name", "display_name")
.annotate(count=aggregate_func)
.order_by("-count")
)
return process_grouped_data(data)
def build_simple_chart_response(
queryset: QuerySet, id_field: str, name_field: str, aggregate_func: Aggregate
) -> List[Dict[str, Any]]:
data = (
queryset.annotate(
key=F(id_field), display_name=F(name_field) if name_field else F(id_field)
)
.values("key", "display_name")
.annotate(count=aggregate_func)
.order_by("key")
)
return [
{
"key": item["key"] if item["key"] else "None",
"name": item["display_name"] if item["display_name"] else "None",
"count": item["count"],
}
for item in data
]
def build_analytics_chart(
queryset: QuerySet[Issue],
x_axis: str,
group_by: Optional[str] = None,
date_filter: Optional[str] = None,
) -> Dict[str, Union[List[Dict[str, Any]], Dict[str, str]]]:
# Validate x_axis
if x_axis not in x_axis_mapper:
raise ValidationError(f"Invalid x_axis field: {x_axis}")
# Validate group_by
if group_by and group_by not in x_axis_mapper:
raise ValidationError(f"Invalid group_by field: {group_by}")
field_mapping = get_x_axis_field()
id_field, name_field, additional_filter = field_mapping.get(
x_axis, (None, None, {})
)
group_field, group_name_field, group_additional_filter = field_mapping.get(
group_by, (None, None, {})
)
# Apply additional filters if they exist
if additional_filter or {}:
queryset = queryset.filter(**additional_filter)
if group_additional_filter or {}:
queryset = queryset.filter(**group_additional_filter)
aggregate_func = Count("id", distinct=True)
if group_field:
response, schema = build_grouped_chart_response(
queryset,
id_field,
name_field,
group_field,
group_name_field,
aggregate_func,
)
else:
response = build_simple_chart_response(
queryset, id_field, name_field, aggregate_func
)
schema = {}
return {"data": response, "schema": schema}

View file

@ -0,0 +1,98 @@
# Python imports
from functools import wraps
# Django imports
from django.conf import settings
from django.core.cache import cache
# Third party imports
from rest_framework.response import Response
def generate_cache_key(custom_path, auth_header=None):
"""Generate a cache key with the given params"""
if auth_header:
key_data = f"{custom_path}:{auth_header}"
else:
key_data = custom_path
return key_data
def cache_response(timeout=60 * 60, path=None, user=True):
"""decorator to create cache per user"""
def decorator(view_func):
@wraps(view_func)
def _wrapped_view(instance, request, *args, **kwargs):
# Function to generate cache key
auth_header = (
None
if request.user.is_anonymous
else str(request.user.id)
if user
else None
)
custom_path = path if path is not None else request.get_full_path()
key = generate_cache_key(custom_path, auth_header)
cached_result = cache.get(key)
if cached_result is not None:
return Response(cached_result["data"], status=cached_result["status"])
response = view_func(instance, request, *args, **kwargs)
if response.status_code == 200 and not settings.DEBUG:
cache.set(
key,
{"data": response.data, "status": response.status_code},
timeout,
)
return response
return _wrapped_view
return decorator
def invalidate_cache_directly(
path=None, url_params=False, user=True, request=None, multiple=False
):
if url_params and path:
path_with_values = path
# Assuming `kwargs` could be passed directly if needed, otherwise, skip this part
for key, value in request.resolver_match.kwargs.items():
path_with_values = path_with_values.replace(f":{key}", str(value))
custom_path = path_with_values
else:
custom_path = path if path is not None else request.get_full_path()
auth_header = (
None
if request and request.user.is_anonymous
else str(request.user.id)
if user
else None
)
key = generate_cache_key(custom_path, auth_header)
if multiple:
cache.delete_many(keys=cache.keys(f"*{key}*"))
else:
cache.delete(key)
def invalidate_cache(path=None, url_params=False, user=True, multiple=False):
def decorator(view_func):
@wraps(view_func)
def _wrapped_view(instance, request, *args, **kwargs):
# invalidate the cache
invalidate_cache_directly(
path=path,
url_params=url_params,
user=user,
request=request,
multiple=multiple,
)
return view_func(instance, request, *args, **kwargs)
return _wrapped_view
return decorator

View file

@ -0,0 +1,67 @@
RESTRICTED_WORKSPACE_SLUGS = [
"404",
"accounts",
"api",
"create-workspace",
"god-mode",
"installations",
"invitations",
"onboarding",
"profile",
"spaces",
"workspace-invitations",
"password",
"flags",
"monitor",
"monitoring",
"ingest",
"plane-pro",
"plane-ultimate",
"enterprise",
"plane-enterprise",
"disco",
"silo",
"chat",
"calendar",
"drive",
"channels",
"upgrade",
"billing",
"sign-in",
"sign-up",
"signin",
"signup",
"config",
"live",
"admin",
"m",
"import",
"importers",
"integrations",
"integration",
"configuration",
"initiatives",
"initiative",
"config",
"workflow",
"workflows",
"epics",
"epic",
"story",
"mobile",
"dashboard",
"desktop",
"onload",
"real-time",
"one",
"pages",
"mobile",
"business",
"pro",
"settings",
"monitor",
"license",
"licenses",
"instances",
"instance",
]

View file

@ -0,0 +1,201 @@
from datetime import datetime, timedelta, date
from django.utils import timezone
from typing import Dict, Optional, List, Union, Tuple, Any
from plane.db.models import User
def get_analytics_date_range(
date_filter: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> Optional[Dict[str, Dict[str, datetime]]]:
"""
Get date range for analytics with current and previous periods for comparison.
Returns a dictionary with current and previous date ranges.
Args:
date_filter (str): The type of date filter to apply
start_date (str): Start date for custom range (format: YYYY-MM-DD)
end_date (str): End date for custom range (format: YYYY-MM-DD)
Returns:
dict: Dictionary containing current and previous date ranges
"""
if not date_filter:
return None
today = timezone.now().date()
if date_filter == "yesterday":
yesterday = today - timedelta(days=1)
return {
"current": {
"gte": datetime.combine(yesterday, datetime.min.time()),
"lte": datetime.combine(yesterday, datetime.max.time()),
}
}
elif date_filter == "last_7_days":
return {
"current": {
"gte": datetime.combine(today - timedelta(days=7), datetime.min.time()),
"lte": datetime.combine(today, datetime.max.time()),
},
"previous": {
"gte": datetime.combine(
today - timedelta(days=14), datetime.min.time()
),
"lte": datetime.combine(today - timedelta(days=8), datetime.max.time()),
},
}
elif date_filter == "last_30_days":
return {
"current": {
"gte": datetime.combine(
today - timedelta(days=30), datetime.min.time()
),
"lte": datetime.combine(today, datetime.max.time()),
},
"previous": {
"gte": datetime.combine(
today - timedelta(days=60), datetime.min.time()
),
"lte": datetime.combine(
today - timedelta(days=31), datetime.max.time()
),
},
}
elif date_filter == "last_3_months":
return {
"current": {
"gte": datetime.combine(
today - timedelta(days=90), datetime.min.time()
),
"lte": datetime.combine(today, datetime.max.time()),
},
"previous": {
"gte": datetime.combine(
today - timedelta(days=180), datetime.min.time()
),
"lte": datetime.combine(
today - timedelta(days=91), datetime.max.time()
),
},
}
elif date_filter == "custom" and start_date and end_date:
try:
start = datetime.strptime(start_date, "%Y-%m-%d").date()
end = datetime.strptime(end_date, "%Y-%m-%d").date()
return {
"current": {
"gte": datetime.combine(start, datetime.min.time()),
"lte": datetime.combine(end, datetime.max.time()),
}
}
except (ValueError, TypeError):
return None
return None
def get_chart_period_range(
date_filter: Optional[str] = None,
) -> Optional[Tuple[date, date]]:
"""
Get date range for chart visualization.
Returns a tuple of (start_date, end_date) for the specified period.
Args:
date_filter (str): The type of date filter to apply. Options are:
- "yesterday": Yesterday's date
- "last_7_days": Last 7 days
- "last_30_days": Last 30 days
- "last_3_months": Last 90 days
Defaults to "last_7_days" if not specified or invalid.
Returns:
tuple: A tuple containing (start_date, end_date) as date objects
"""
if not date_filter:
return None
today = timezone.now().date()
period_ranges = {
"yesterday": (
today - timedelta(days=1),
today - timedelta(days=1),
),
"last_7_days": (today - timedelta(days=7), today),
"last_30_days": (today - timedelta(days=30), today),
"last_3_months": (today - timedelta(days=90), today),
}
return period_ranges.get(date_filter, None)
def get_analytics_filters(
slug: str,
user: User,
type: str,
date_filter: Optional[str] = None,
project_ids: Optional[Union[str, List[str]]] = None,
) -> Dict[str, Any]:
"""
Get combined project and date filters for analytics endpoints
Args:
slug: The workspace slug
user: The current user
type: The type of filter ("analytics" or "chart")
date_filter: Optional date filter string
project_ids: Optional list of project IDs or comma-separated string of project IDs
Returns:
dict: A dictionary containing:
- base_filters: Base filters for the workspace and user
- project_filters: Project-specific filters
- analytics_date_range: Date range filters for analytics comparison
- chart_period_range: Date range for chart visualization
"""
# Get project IDs from request
if project_ids and isinstance(project_ids, str):
project_ids = [str(project_id) for project_id in project_ids.split(",")]
# Base filters for workspace and user
base_filters = {
"workspace__slug": slug,
"project__project_projectmember__member": user,
"project__project_projectmember__is_active": True,
"project__deleted_at__isnull": True,
"project__archived_at__isnull": True,
}
# Project filters
project_filters = {
"workspace__slug": slug,
"project_projectmember__member": user,
"project_projectmember__is_active": True,
"deleted_at__isnull": True,
"archived_at__isnull": True,
}
# Add project IDs to filters if provided
if project_ids:
base_filters["project_id__in"] = project_ids
project_filters["id__in"] = project_ids
# Initialize date range variables
analytics_date_range = None
chart_period_range = None
# Get date range filters based on type
if type == "analytics":
analytics_date_range = get_analytics_date_range(date_filter)
elif type == "chart":
chart_period_range = get_chart_period_range(date_filter)
return {
"base_filters": base_filters,
"project_filters": project_filters,
"analytics_date_range": analytics_date_range,
"chart_period_range": chart_period_range,
}

View file

@ -0,0 +1,10 @@
ERROR_CODES = {
# issues
"INVALID_ARCHIVE_STATE_GROUP": 4091,
"INVALID_ISSUE_DATES": 4100,
"INVALID_ISSUE_START_DATE": 4101,
"INVALID_ISSUE_TARGET_DATE": 4102,
# pages
"PAGE_LOCKED": 4701,
"PAGE_ARCHIVED": 4702,
}

View file

@ -0,0 +1,18 @@
# Python imports
import logging
import traceback
# Django imports
from django.conf import settings
def log_exception(e):
# Log the error
logger = logging.getLogger("plane.exception")
logger.exception(e)
if settings.DEBUG:
# Print the traceback if in debug mode
print(traceback.format_exc())
return

View file

@ -0,0 +1,83 @@
# python imports
from math import ceil
# constants
PAGINATOR_MAX_LIMIT = 1000
class PaginateCursor:
def __init__(self, current_page_size: int, current_page: int, offset: int):
self.current_page_size = current_page_size
self.current_page = current_page
self.offset = offset
def __str__(self):
return f"{self.current_page_size}:{self.current_page}:{self.offset}"
@classmethod
def from_string(self, value):
"""Return the cursor value from string format"""
try:
bits = value.split(":")
if len(bits) != 3:
raise ValueError("Cursor must be in the format 'value:offset:is_prev'")
return self(int(bits[0]), int(bits[1]), int(bits[2]))
except (TypeError, ValueError) as e:
raise ValueError(f"Invalid cursor format: {e}")
def paginate(base_queryset, queryset, cursor, on_result):
# validating for cursor
if cursor is None:
cursor_object = PaginateCursor(PAGINATOR_MAX_LIMIT, 0, 0)
else:
cursor_object = PaginateCursor.from_string(cursor)
# getting the issues count
total_results = base_queryset.count()
page_size = min(cursor_object.current_page_size, PAGINATOR_MAX_LIMIT)
# getting the total pages available based on the page size
total_pages = ceil(total_results / page_size)
# Calculate the start and end index for the paginated data
start_index = 0
if cursor_object.current_page > 0:
start_index = cursor_object.current_page * page_size
end_index = min(start_index + page_size, total_results)
# Get the paginated data
paginated_data = queryset[start_index:end_index]
# Create the pagination info object
prev_cursor = f"{page_size}:{cursor_object.current_page-1}:0"
cursor = f"{page_size}:{cursor_object.current_page}:0"
next_cursor = None
if end_index < total_results:
next_cursor = f"{page_size}:{cursor_object.current_page+1}:0"
prev_page_results = False
if cursor_object.current_page > 0:
prev_page_results = True
next_page_results = False
if next_cursor:
next_page_results = True
if on_result:
paginated_data = on_result(paginated_data)
# returning the result
paginated_data = {
"prev_cursor": prev_cursor,
"cursor": cursor,
"next_cursor": next_cursor,
"prev_page_results": prev_page_results,
"next_page_results": next_page_results,
"page_count": len(paginated_data),
"total_results": total_results,
"total_pages": total_pages,
"results": paginated_data,
}
return paginated_data

View file

@ -0,0 +1,220 @@
# Django imports
from django.contrib.postgres.aggregates import ArrayAgg
from django.contrib.postgres.fields import ArrayField
from django.db.models import Q, UUIDField, Value, QuerySet
from django.db.models.functions import Coalesce
# Module imports
from plane.db.models import (
Cycle,
Issue,
Label,
Module,
Project,
ProjectMember,
State,
WorkspaceMember,
)
from typing import Optional, Dict, Tuple, Any, Union, List
def issue_queryset_grouper(
queryset: QuerySet[Issue],
group_by: Optional[str],
sub_group_by: Optional[str],
) -> QuerySet[Issue]:
FIELD_MAPPER: Dict[str, str] = {
"label_ids": "labels__id",
"assignee_ids": "assignees__id",
"module_ids": "issue_module__module_id",
}
GROUP_FILTER_MAPPER: Dict[str, Q] = {
"assignees__id": Q(issue_assignee__deleted_at__isnull=True),
"labels__id": Q(label_issue__deleted_at__isnull=True),
"issue_module__module_id": Q(issue_module__deleted_at__isnull=True),
}
for group_key in [group_by, sub_group_by]:
if group_key in GROUP_FILTER_MAPPER:
queryset = queryset.filter(GROUP_FILTER_MAPPER[group_key])
annotations_map: Dict[str, Tuple[str, Q]] = {
"assignee_ids": (
"assignees__id",
~Q(assignees__id__isnull=True) & Q(issue_assignee__deleted_at__isnull=True),
),
"label_ids": (
"labels__id",
~Q(labels__id__isnull=True) & Q(label_issue__deleted_at__isnull=True),
),
"module_ids": (
"issue_module__module_id",
(
~Q(issue_module__module_id__isnull=True)
& Q(issue_module__module__archived_at__isnull=True)
& Q(issue_module__deleted_at__isnull=True)
),
),
}
default_annotations: Dict[str, Any] = {
key: Coalesce(
ArrayAgg(field, distinct=True, filter=condition),
Value([], output_field=ArrayField(UUIDField())),
)
for key, (field, condition) in annotations_map.items()
if FIELD_MAPPER.get(key) != group_by or FIELD_MAPPER.get(key) != sub_group_by
}
return queryset.annotate(**default_annotations)
def issue_on_results(
issues: QuerySet[Issue],
group_by: Optional[str],
sub_group_by: Optional[str],
) -> List[Dict[str, Any]]:
FIELD_MAPPER: Dict[str, str] = {
"labels__id": "label_ids",
"assignees__id": "assignee_ids",
"issue_module__module_id": "module_ids",
}
original_list: List[str] = ["assignee_ids", "label_ids", "module_ids"]
required_fields: List[str] = [
"id",
"name",
"state_id",
"sort_order",
"completed_at",
"estimate_point",
"priority",
"start_date",
"target_date",
"sequence_id",
"project_id",
"parent_id",
"cycle_id",
"sub_issues_count",
"created_at",
"updated_at",
"created_by",
"updated_by",
"attachment_count",
"link_count",
"is_draft",
"archived_at",
"state__group",
]
if group_by in FIELD_MAPPER:
original_list.remove(FIELD_MAPPER[group_by])
original_list.append(group_by)
if sub_group_by in FIELD_MAPPER:
original_list.remove(FIELD_MAPPER[sub_group_by])
original_list.append(sub_group_by)
required_fields.extend(original_list)
return list(issues.values(*required_fields))
def issue_group_values(
field: str,
slug: str,
project_id: Optional[str] = None,
filters: Dict[str, Any] = {},
) -> List[Union[str, Any]]:
if field == "state_id":
queryset = State.objects.filter(
is_triage=False, workspace__slug=slug
).values_list("id", flat=True)
if project_id:
return list(queryset.filter(project_id=project_id))
return list(queryset)
if field == "labels__id":
queryset = Label.objects.filter(workspace__slug=slug).values_list(
"id", flat=True
)
if project_id:
return list(queryset.filter(project_id=project_id)) + ["None"]
return list(queryset) + ["None"]
if field == "assignees__id":
if project_id:
return list(
ProjectMember.objects.filter(
workspace__slug=slug, project_id=project_id, is_active=True
).values_list("member_id", flat=True)
)
return list(
WorkspaceMember.objects.filter(
workspace__slug=slug, is_active=True
).values_list("member_id", flat=True)
)
if field == "issue_module__module_id":
queryset = Module.objects.filter(workspace__slug=slug).values_list(
"id", flat=True
)
if project_id:
return list(queryset.filter(project_id=project_id)) + ["None"]
return list(queryset) + ["None"]
if field == "cycle_id":
queryset = Cycle.objects.filter(workspace__slug=slug).values_list(
"id", flat=True
)
if project_id:
return list(queryset.filter(project_id=project_id)) + ["None"]
return list(queryset) + ["None"]
if field == "project_id":
queryset = Project.objects.filter(workspace__slug=slug).values_list(
"id", flat=True
)
return list(queryset)
if field == "priority":
return ["low", "medium", "high", "urgent", "none"]
if field == "state__group":
return ["backlog", "unstarted", "started", "completed", "cancelled"]
if field == "target_date":
queryset = (
Issue.issue_objects.filter(workspace__slug=slug)
.filter(**filters)
.values_list("target_date", flat=True)
.distinct()
)
if project_id:
return list(queryset.filter(project_id=project_id))
return list(queryset)
if field == "start_date":
queryset = (
Issue.issue_objects.filter(workspace__slug=slug)
.filter(**filters)
.values_list("start_date", flat=True)
.distinct()
)
if project_id:
return list(queryset.filter(project_id=project_id))
return list(queryset)
if field == "created_by":
queryset = (
Issue.issue_objects.filter(workspace__slug=slug)
.filter(**filters)
.values_list("created_by", flat=True)
.distinct()
)
if project_id:
return list(queryset.filter(project_id=project_id))
return list(queryset)
return []

View file

@ -0,0 +1,67 @@
# Django imports
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.http import HttpRequest
# Third party imports
from rest_framework.request import Request
# Module imports
from plane.utils.ip_address import get_client_ip
def base_host(
request: Request | HttpRequest,
is_admin: bool = False,
is_space: bool = False,
is_app: bool = False,
) -> str:
"""Utility function to return host / origin from the request"""
# Calculate the base origin from request
base_origin = settings.WEB_URL or settings.APP_BASE_URL
if not base_origin:
raise ImproperlyConfigured("APP_BASE_URL or WEB_URL is not set")
# Admin redirection
if is_admin:
admin_base_path = getattr(settings, "ADMIN_BASE_PATH", None)
if not isinstance(admin_base_path, str):
admin_base_path = "/god-mode/"
if not admin_base_path.startswith("/"):
admin_base_path = "/" + admin_base_path
if not admin_base_path.endswith("/"):
admin_base_path += "/"
if settings.ADMIN_BASE_URL:
return settings.ADMIN_BASE_URL + admin_base_path
else:
return base_origin + admin_base_path
# Space redirection
if is_space:
space_base_path = getattr(settings, "SPACE_BASE_PATH", None)
if not isinstance(space_base_path, str):
space_base_path = "/spaces/"
if not space_base_path.startswith("/"):
space_base_path = "/" + space_base_path
if not space_base_path.endswith("/"):
space_base_path += "/"
if settings.SPACE_BASE_URL:
return settings.SPACE_BASE_URL + space_base_path
else:
return base_origin + space_base_path
# App Redirection
if is_app:
if settings.APP_BASE_URL:
return settings.APP_BASE_URL
else:
return base_origin
return base_origin
def user_ip(request: Request | HttpRequest) -> str:
return get_client_ip(request=request)

View file

@ -0,0 +1,27 @@
from io import StringIO
from html.parser import HTMLParser
class MLStripper(HTMLParser):
"""
Markup Language Stripper
"""
def __init__(self):
super().__init__()
self.reset()
self.strict = False
self.convert_charrefs = True
self.text = StringIO()
def handle_data(self, d):
self.text.write(d)
def get_data(self):
return self.text.getvalue()
def strip_tags(html):
s = MLStripper()
s.feed(html)
return s.get_data()

View file

@ -0,0 +1,17 @@
import pkgutil
import six
def import_submodules(context, root_module, path):
"""
Import all submodules and register them in the ``context`` namespace.
>>> import_submodules(locals(), __name__, __path__)
"""
for loader, module_name, is_pkg in pkgutil.walk_packages(path, root_module + "."):
# this causes a Runtime error with model conflicts
# module = loader.find_module(module_name).load_module(module_name)
module = __import__(module_name, globals(), locals(), ["__name__"])
for k, v in six.iteritems(vars(module)):
if not k.startswith("_"):
context[k] = v
context[module_name] = module

View file

@ -0,0 +1,7 @@
def get_client_ip(request):
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
ip = x_forwarded_for.split(",")[0]
else:
ip = request.META.get("REMOTE_ADDR")
return ip

View file

@ -0,0 +1,545 @@
import re
import uuid
from datetime import timedelta
from django.utils import timezone
# The date from pattern
pattern = re.compile(r"\d+_(weeks|months)$")
# check the valid uuids
def filter_valid_uuids(uuid_list):
valid_uuids = []
for uuid_str in uuid_list:
try:
uuid_obj = uuid.UUID(uuid_str)
valid_uuids.append(uuid_obj)
except ValueError:
# ignore the invalid uuids
pass
return valid_uuids
# Get the 2_weeks, 3_months
def string_date_filter(issue_filter, duration, subsequent, term, date_filter, offset):
now = timezone.now().date()
if term == "months":
if subsequent == "after":
if offset == "fromnow":
issue_filter[f"{date_filter}__gte"] = now + timedelta(
days=duration * 30
)
else:
issue_filter[f"{date_filter}__gte"] = now - timedelta(
days=duration * 30
)
else:
if offset == "fromnow":
issue_filter[f"{date_filter}__lte"] = now + timedelta(
days=duration * 30
)
else:
issue_filter[f"{date_filter}__lte"] = now - timedelta(
days=duration * 30
)
if term == "weeks":
if subsequent == "after":
if offset == "fromnow":
issue_filter[f"{date_filter}__gte"] = now + timedelta(weeks=duration)
else:
issue_filter[f"{date_filter}__gte"] = now - timedelta(weeks=duration)
else:
if offset == "fromnow":
issue_filter[f"{date_filter}__lte"] = now + timedelta(weeks=duration)
else:
issue_filter[f"{date_filter}__lte"] = now - timedelta(weeks=duration)
def date_filter(issue_filter, date_term, queries):
"""
Handle all date filters
"""
for query in queries:
date_query = query.split(";")
if date_query:
if len(date_query) >= 2:
match = pattern.match(date_query[0])
if match:
if len(date_query) == 3:
digit, term = date_query[0].split("_")
string_date_filter(
issue_filter=issue_filter,
duration=int(digit),
subsequent=date_query[1],
term=term,
date_filter=date_term,
offset=date_query[2],
)
else:
if "after" in date_query:
issue_filter[f"{date_term}__gte"] = date_query[0]
else:
issue_filter[f"{date_term}__lte"] = date_query[0]
else:
issue_filter[f"{date_term}__contains"] = date_query[0]
def filter_state(params, issue_filter, method, prefix=""):
if method == "GET":
states = [item for item in params.get("state").split(",") if item != "null"]
states = filter_valid_uuids(states)
if len(states) and "" not in states:
issue_filter[f"{prefix}state__in"] = states
else:
if (
params.get("state", None)
and len(params.get("state"))
and params.get("state") != "null"
):
issue_filter[f"{prefix}state__in"] = params.get("state")
return issue_filter
def filter_state_group(params, issue_filter, method, prefix=""):
if method == "GET":
state_group = [
item for item in params.get("state_group").split(",") if item != "null"
]
if len(state_group) and "" not in state_group:
issue_filter[f"{prefix}state__group__in"] = state_group
else:
if (
params.get("state_group", None)
and len(params.get("state_group"))
and params.get("state_group") != "null"
):
issue_filter[f"{prefix}state__group__in"] = params.get("state_group")
return issue_filter
def filter_estimate_point(params, issue_filter, method, prefix=""):
if method == "GET":
estimate_points = [
item for item in params.get("estimate_point").split(",") if item != "null"
]
if len(estimate_points) and "" not in estimate_points:
issue_filter[f"{prefix}estimate_point__in"] = estimate_points
else:
if (
params.get("estimate_point", None)
and len(params.get("estimate_point"))
and params.get("estimate_point") != "null"
):
issue_filter[f"{prefix}estimate_point__in"] = params.get("estimate_point")
return issue_filter
def filter_priority(params, issue_filter, method, prefix=""):
if method == "GET":
priorities = [
item for item in params.get("priority").split(",") if item != "null"
]
if len(priorities) and "" not in priorities:
issue_filter[f"{prefix}priority__in"] = priorities
else:
if (
params.get("priority", None)
and len(params.get("priority"))
and params.get("priority") != "null"
):
issue_filter[f"{prefix}priority__in"] = params.get("priority")
return issue_filter
def filter_parent(params, issue_filter, method, prefix=""):
if method == "GET":
parents = [item for item in params.get("parent").split(",") if item != "null"]
if "None" in parents:
issue_filter[f"{prefix}parent__isnull"] = True
parents = filter_valid_uuids(parents)
if len(parents) and "" not in parents:
issue_filter[f"{prefix}parent__in"] = parents
else:
if (
params.get("parent", None)
and len(params.get("parent"))
and params.get("parent") != "null"
):
issue_filter[f"{prefix}parent__in"] = params.get("parent")
return issue_filter
def filter_labels(params, issue_filter, method, prefix=""):
if method == "GET":
labels = [item for item in params.get("labels").split(",") if item != "null"]
if "None" in labels:
issue_filter[f"{prefix}labels__isnull"] = True
labels = filter_valid_uuids(labels)
if len(labels) and "" not in labels:
issue_filter[f"{prefix}labels__in"] = labels
else:
if (
params.get("labels", None)
and len(params.get("labels"))
and params.get("labels") != "null"
):
issue_filter[f"{prefix}labels__in"] = params.get("labels")
issue_filter[f"{prefix}label_issue__deleted_at__isnull"] = True
return issue_filter
def filter_assignees(params, issue_filter, method, prefix=""):
if method == "GET":
assignees = [
item for item in params.get("assignees").split(",") if item != "null"
]
if "None" in assignees:
issue_filter[f"{prefix}assignees__isnull"] = True
assignees = filter_valid_uuids(assignees)
if len(assignees) and "" not in assignees:
issue_filter[f"{prefix}assignees__in"] = assignees
else:
if (
params.get("assignees", None)
and len(params.get("assignees"))
and params.get("assignees") != "null"
):
issue_filter[f"{prefix}assignees__in"] = params.get("assignees")
issue_filter[f"{prefix}issue_assignee__deleted_at__isnull"] = True
return issue_filter
def filter_mentions(params, issue_filter, method, prefix=""):
if method == "GET":
mentions = [
item for item in params.get("mentions").split(",") if item != "null"
]
mentions = filter_valid_uuids(mentions)
if len(mentions) and "" not in mentions:
issue_filter[f"{prefix}issue_mention__mention__id__in"] = mentions
else:
if (
params.get("mentions", None)
and len(params.get("mentions"))
and params.get("mentions") != "null"
):
issue_filter[f"{prefix}issue_mention__mention__id__in"] = params.get(
"mentions"
)
return issue_filter
def filter_created_by(params, issue_filter, method, prefix=""):
if method == "GET":
created_bys = [
item for item in params.get("created_by").split(",") if item != "null"
]
if "None" in created_bys:
issue_filter[f"{prefix}created_by__isnull"] = True
created_bys = filter_valid_uuids(created_bys)
if len(created_bys) and "" not in created_bys:
issue_filter[f"{prefix}created_by__in"] = created_bys
else:
if (
params.get("created_by", None)
and len(params.get("created_by"))
and params.get("created_by") != "null"
):
issue_filter[f"{prefix}created_by__in"] = params.get("created_by")
return issue_filter
def filter_name(params, issue_filter, method, prefix=""):
if params.get("name", "") != "":
issue_filter[f"{prefix}name__icontains"] = params.get("name")
return issue_filter
def filter_created_at(params, issue_filter, method, prefix=""):
if method == "GET":
created_ats = params.get("created_at").split(",")
if len(created_ats) and "" not in created_ats:
date_filter(
issue_filter=issue_filter,
date_term=f"{prefix}created_at__date",
queries=created_ats,
)
else:
if params.get("created_at", None) and len(params.get("created_at")):
date_filter(
issue_filter=issue_filter,
date_term=f"{prefix}created_at__date",
queries=params.get("created_at", []),
)
return issue_filter
def filter_updated_at(params, issue_filter, method, prefix=""):
if method == "GET":
updated_ats = params.get("updated_at").split(",")
if len(updated_ats) and "" not in updated_ats:
date_filter(
issue_filter=issue_filter,
date_term=f"{prefix}created_at__date",
queries=updated_ats,
)
else:
if params.get("updated_at", None) and len(params.get("updated_at")):
date_filter(
issue_filter=issue_filter,
date_term=f"{prefix}created_at__date",
queries=params.get("updated_at", []),
)
return issue_filter
def filter_start_date(params, issue_filter, method, prefix=""):
if method == "GET":
start_dates = params.get("start_date").split(",")
if len(start_dates) and "" not in start_dates:
date_filter(
issue_filter=issue_filter,
date_term=f"{prefix}start_date",
queries=start_dates,
)
else:
if params.get("start_date", None) and len(params.get("start_date")):
issue_filter[f"{prefix}start_date"] = params.get("start_date")
return issue_filter
def filter_target_date(params, issue_filter, method, prefix=""):
if method == "GET":
target_dates = params.get("target_date").split(",")
if len(target_dates) and "" not in target_dates:
date_filter(
issue_filter=issue_filter,
date_term=f"{prefix}target_date",
queries=target_dates,
)
else:
if params.get("target_date", None) and len(params.get("target_date")):
issue_filter[f"{prefix}target_date"] = params.get("target_date")
return issue_filter
def filter_completed_at(params, issue_filter, method, prefix=""):
if method == "GET":
completed_ats = params.get("completed_at").split(",")
if len(completed_ats) and "" not in completed_ats:
date_filter(
issue_filter=issue_filter,
date_term=f"{prefix}completed_at__date",
queries=completed_ats,
)
else:
if params.get("completed_at", None) and len(params.get("completed_at")):
date_filter(
issue_filter=issue_filter,
date_term=f"{prefix}completed_at__date",
queries=params.get("completed_at", []),
)
return issue_filter
def filter_issue_state_type(params, issue_filter, method, prefix=""):
type = params.get("type", "all")
group = ["backlog", "unstarted", "started", "completed", "cancelled"]
if type == "backlog":
group = ["backlog"]
if type == "active":
group = ["unstarted", "started"]
issue_filter[f"{prefix}state__group__in"] = group
return issue_filter
def filter_project(params, issue_filter, method, prefix=""):
if method == "GET":
projects = [item for item in params.get("project").split(",") if item != "null"]
projects = filter_valid_uuids(projects)
if len(projects) and "" not in projects:
issue_filter[f"{prefix}project__in"] = projects
else:
if (
params.get("project", None)
and len(params.get("project"))
and params.get("project") != "null"
):
issue_filter[f"{prefix}project__in"] = params.get("project")
return issue_filter
def filter_cycle(params, issue_filter, method, prefix=""):
if method == "GET":
cycles = [item for item in params.get("cycle").split(",") if item != "null"]
if "None" in cycles:
issue_filter[f"{prefix}issue_cycle__cycle_id__isnull"] = True
cycles = filter_valid_uuids(cycles)
if len(cycles) and "" not in cycles:
issue_filter[f"{prefix}issue_cycle__cycle_id__in"] = cycles
else:
if (
params.get("cycle", None)
and len(params.get("cycle"))
and params.get("cycle") != "null"
):
issue_filter[f"{prefix}issue_cycle__cycle_id__in"] = params.get("cycle")
issue_filter[f"{prefix}issue_cycle__deleted_at__isnull"] = True
return issue_filter
def filter_module(params, issue_filter, method, prefix=""):
if method == "GET":
modules = [item for item in params.get("module").split(",") if item != "null"]
if "None" in modules:
issue_filter[f"{prefix}issue_module__module_id__isnull"] = True
modules = filter_valid_uuids(modules)
if len(modules) and "" not in modules:
issue_filter[f"{prefix}issue_module__module_id__in"] = modules
else:
if (
params.get("module", None)
and len(params.get("module"))
and params.get("module") != "null"
):
issue_filter[f"{prefix}issue_module__module_id__in"] = params.get("module")
issue_filter[f"{prefix}issue_module__deleted_at__isnull"] = True
return issue_filter
def filter_intake_status(params, issue_filter, method, prefix=""):
if method == "GET":
status = [
item for item in params.get("intake_status").split(",") if item != "null"
]
if len(status) and "" not in status:
issue_filter[f"{prefix}issue_intake__status__in"] = status
else:
if (
params.get("intake_status", None)
and len(params.get("intake_status"))
and params.get("intake_status") != "null"
):
issue_filter[f"{prefix}issue_intake__status__in"] = params.get(
"inbox_status"
)
return issue_filter
def filter_inbox_status(params, issue_filter, method, prefix=""):
if method == "GET":
status = [
item for item in params.get("inbox_status").split(",") if item != "null"
]
if len(status) and "" not in status:
issue_filter[f"{prefix}issue_intake__status__in"] = status
else:
if (
params.get("inbox_status", None)
and len(params.get("inbox_status"))
and params.get("inbox_status") != "null"
):
issue_filter[f"{prefix}issue_intake__status__in"] = params.get(
"inbox_status"
)
return issue_filter
def filter_sub_issue_toggle(params, issue_filter, method, prefix=""):
if method == "GET":
sub_issue = params.get("sub_issue", "false")
if sub_issue == "false":
issue_filter[f"{prefix}parent__isnull"] = True
else:
sub_issue = params.get("sub_issue", "false")
if sub_issue == "false":
issue_filter[f"{prefix}parent__isnull"] = True
return issue_filter
def filter_subscribed_issues(params, issue_filter, method, prefix=""):
if method == "GET":
subscribers = [
item for item in params.get("subscriber").split(",") if item != "null"
]
subscribers = filter_valid_uuids(subscribers)
if len(subscribers) and "" not in subscribers:
issue_filter[f"{prefix}issue_subscribers__subscriber_id__in"] = subscribers
else:
if (
params.get("subscriber", None)
and len(params.get("subscriber"))
and params.get("subscriber") != "null"
):
issue_filter[f"{prefix}issue_subscribers__subscriber_id__in"] = params.get(
"subscriber"
)
return issue_filter
def filter_start_target_date_issues(params, issue_filter, method, prefix=""):
start_target_date = params.get("start_target_date", "false")
if start_target_date == "true":
issue_filter[f"{prefix}target_date__isnull"] = False
issue_filter[f"{prefix}start_date__isnull"] = False
return issue_filter
def filter_logged_by(params, issue_filter, method, prefix=""):
if method == "GET":
logged_bys = [
item for item in params.get("logged_by").split(",") if item != "null"
]
if "None" in logged_bys:
issue_filter[f"{prefix}logged_by__isnull"] = True
logged_bys = filter_valid_uuids(logged_bys)
if len(logged_bys) and "" not in logged_bys:
issue_filter[f"{prefix}logged_by__in"] = logged_bys
else:
if (
params.get("logged_by", None)
and len(params.get("logged_by"))
and params.get("logged_by") != "null"
):
issue_filter[f"{prefix}logged_by__in"] = params.get("logged_by")
return issue_filter
def issue_filters(query_params, method, prefix=""):
issue_filter = {}
ISSUE_FILTER = {
"state": filter_state,
"state_group": filter_state_group,
"estimate_point": filter_estimate_point,
"priority": filter_priority,
"parent": filter_parent,
"labels": filter_labels,
"assignees": filter_assignees,
"mentions": filter_mentions,
"created_by": filter_created_by,
"logged_by": filter_logged_by,
"name": filter_name,
"created_at": filter_created_at,
"updated_at": filter_updated_at,
"start_date": filter_start_date,
"target_date": filter_target_date,
"completed_at": filter_completed_at,
"type": filter_issue_state_type,
"project": filter_project,
"cycle": filter_cycle,
"module": filter_module,
"intake_status": filter_intake_status,
"inbox_status": filter_inbox_status,
"sub_issue": filter_sub_issue_toggle,
"subscriber": filter_subscribed_issues,
"start_target_date": filter_start_target_date_issues,
}
for key, value in ISSUE_FILTER.items():
if key in query_params:
func = value
func(query_params, issue_filter, method, prefix)
return issue_filter

View file

@ -0,0 +1,24 @@
def get_inverse_relation(relation_type):
relation_mapping = {
"start_after": "start_before",
"finish_after": "finish_before",
"blocked_by": "blocking",
"blocking": "blocked_by",
"start_before": "start_after",
"finish_before": "finish_after",
}
return relation_mapping.get(relation_type, relation_type)
def get_actual_relation(relation_type):
# This function is used to get the actual relation type which is store in database
actual_relation = {
"start_after": "start_before",
"finish_after": "finish_before",
"blocking": "blocked_by",
"blocked_by": "blocked_by",
"start_before": "start_before",
"finish_before": "finish_before",
}
return actual_relation.get(relation_type, relation_type)

View file

@ -0,0 +1,20 @@
# Python imports
import re
# Django imports
from django.db.models import Q
# Module imports
def search_issues(query, queryset):
fields = ["name", "sequence_id", "project__identifier"]
q = Q()
for field in fields:
if field == "sequence_id" and len(query) <= 20:
sequences = re.findall(r"\b\d+\b", query)
for sequence_id in sequences:
q |= Q(**{"sequence_id": sequence_id})
else:
q |= Q(**{f"{field}__icontains": query})
return queryset.filter(q).distinct()

View file

@ -0,0 +1,46 @@
import logging.handlers as handlers
import time
class SizedTimedRotatingFileHandler(handlers.TimedRotatingFileHandler):
"""
Handler for logging to a set of files, which switches from one file
to the next when the current file reaches a certain size, or at certain
timed intervals
"""
def __init__(
self,
filename,
maxBytes=0,
backupCount=0,
encoding=None,
delay=0,
when="h",
interval=1,
utc=False,
):
handlers.TimedRotatingFileHandler.__init__(
self, filename, when, interval, backupCount, encoding, delay, utc
)
self.maxBytes = maxBytes
def shouldRollover(self, record):
"""
Determine if rollover should occur.
Basically, see if the supplied record would cause the file to exceed
the size limit we have.
"""
if self.stream is None: # delay was set...
self.stream = self._open()
if self.maxBytes > 0: # are we rolling over?
msg = "%s\n" % self.format(record)
# due to non-posix-compliant Windows feature
self.stream.seek(0, 2)
if self.stream.tell() + len(msg) >= self.maxBytes:
return 1
t = int(time.time())
if t >= self.rolloverAt:
return 1
return 0

View file

@ -0,0 +1,3 @@
import mistune
markdown = mistune.Markdown()

View file

@ -0,0 +1,72 @@
from django.db.models import Case, CharField, Min, Value, When
# Custom ordering for priority and state
PRIORITY_ORDER = ["urgent", "high", "medium", "low", "none"]
STATE_ORDER = ["backlog", "unstarted", "started", "completed", "cancelled"]
def order_issue_queryset(issue_queryset, order_by_param="-created_at"):
# Priority Ordering
if order_by_param == "priority" or order_by_param == "-priority":
issue_queryset = issue_queryset.annotate(
priority_order=Case(
*[
When(priority=p, then=Value(i))
for i, p in enumerate(PRIORITY_ORDER)
],
output_field=CharField(),
)
).order_by("priority_order", "-created_at")
order_by_param = (
"priority_order" if order_by_param.startswith("-") else "-priority_order"
)
# State Ordering
elif order_by_param in ["state__group", "-state__group"]:
state_order = (
STATE_ORDER
if order_by_param in ["state__name", "state__group"]
else STATE_ORDER[::-1]
)
issue_queryset = issue_queryset.annotate(
state_order=Case(
*[
When(state__group=state_group, then=Value(i))
for i, state_group in enumerate(state_order)
],
default=Value(len(state_order)),
output_field=CharField(),
)
).order_by("state_order", "-created_at")
order_by_param = (
"-state_order" if order_by_param.startswith("-") else "state_order"
)
# assignee and label ordering
elif order_by_param in [
"labels__name",
"assignees__first_name",
"issue_module__module__name",
"-labels__name",
"-assignees__first_name",
"-issue_module__module__name",
]:
issue_queryset = issue_queryset.annotate(
min_values=Min(
order_by_param[1::]
if order_by_param.startswith("-")
else order_by_param
)
).order_by(
"-min_values" if order_by_param.startswith("-") else "min_values",
"-created_at",
)
order_by_param = (
"-min_values" if order_by_param.startswith("-") else "min_values"
)
else:
# If the order_by_param is created_at, then don't add the -created_at
if "created_at" in order_by_param:
issue_queryset = issue_queryset.order_by(order_by_param)
else:
issue_queryset = issue_queryset.order_by(order_by_param, "-created_at")
order_by_param = order_by_param
return issue_queryset, order_by_param

View file

@ -0,0 +1,780 @@
# Python imports
import math
from collections import defaultdict
from collections.abc import Sequence
# Django imports
from django.db.models import Count, F, Window
from django.db.models.functions import RowNumber
# Third party imports
from rest_framework.exceptions import ParseError
from rest_framework.response import Response
# Module imports
class Cursor:
# The cursor value
def __init__(self, value, offset=0, is_prev=False, has_results=None):
self.value = value
self.offset = int(offset)
self.is_prev = bool(is_prev)
self.has_results = has_results
# Return the cursor value in string format
def __str__(self):
return f"{self.value}:{self.offset}:{int(self.is_prev)}"
# Return the cursor value
def __eq__(self, other):
return all(
getattr(self, attr) == getattr(other, attr)
for attr in ("value", "offset", "is_prev", "has_results")
)
# Return the representation of the cursor
def __repr__(self):
return f"{type(self).__name__,}: value={self.value} offset={self.offset}, is_prev={int(self.is_prev)}"
# Return if the cursor is true
def __bool__(self):
return bool(self.has_results)
@classmethod
def from_string(cls, value):
"""Return the cursor value from string format"""
try:
bits = value.split(":")
if len(bits) != 3:
raise ValueError("Cursor must be in the format 'value:offset:is_prev'")
value = float(bits[0]) if "." in bits[0] else int(bits[0])
return cls(value, int(bits[1]), bool(int(bits[2])))
except (TypeError, ValueError) as e:
raise ValueError(f"Invalid cursor format: {e}")
class CursorResult(Sequence):
def __init__(self, results, next, prev, hits=None, max_hits=None):
self.results = results
self.next = next
self.prev = prev
self.hits = hits
self.max_hits = max_hits
def __len__(self):
# Return the length of the results
return len(self.results)
def __iter__(self):
# Return the iterator of the results
return iter(self.results)
def __getitem__(self, key):
# Return the results based on the key
return self.results[key]
def __repr__(self):
# Return the representation of the results
return f"<{type(self).__name__}: results={len(self.results)}>"
MAX_LIMIT = 1000
class BadPaginationError(Exception):
pass
class OffsetPaginator:
"""
The Offset paginator using the offset and limit
with cursor controls
http://example.com/api/users/?cursor=10.0.0&per_page=10
cursor=limit,offset=page,
"""
def __init__(
self,
queryset,
order_by=None,
max_limit=MAX_LIMIT,
max_offset=None,
on_results=None,
total_count_queryset=None,
):
# Key tuple and remove `-` if descending order by
self.key = (
order_by
if order_by is None or isinstance(order_by, (list, tuple, set))
else (order_by[1::] if order_by.startswith("-") else order_by,)
)
# Set desc to true when `-` exists in the order by
self.desc = True if order_by and order_by.startswith("-") else False
self.queryset = queryset
self.max_limit = max_limit
self.max_offset = max_offset
self.on_results = on_results
self.total_count_queryset = total_count_queryset
def get_result(self, limit=1000, cursor=None):
# offset is page #
# value is page limit
if cursor is None:
cursor = Cursor(0, 0, 0)
# Get the min from limit and max limit
limit = min(limit, self.max_limit)
# queryset
queryset = self.queryset
if self.key:
queryset = queryset.order_by(
(
F(*self.key).desc(nulls_last=True)
if self.desc
else F(*self.key).asc(nulls_last=True)
),
"-created_at",
)
# The current page
page = cursor.offset
# The offset - use limit instead of cursor.value for consistent pagination
offset = cursor.offset * limit
stop = offset + limit + 1
if self.max_offset is not None and offset >= self.max_offset:
raise BadPaginationError("Pagination offset too large")
if offset < 0:
raise BadPaginationError("Pagination offset cannot be negative")
results = queryset[offset:stop]
# Duplicate the queryset so it does not evaluate on any python ops
page_results = queryset[offset:stop].values("id")
# Only slice from the end if we're going backwards (previous page)
if cursor.value != limit and cursor.is_prev:
results = results[-(limit + 1) :]
total_count = (
self.total_count_queryset.count()
if self.total_count_queryset
else results.count()
)
# Check if there are more results available after the current page
# Adjust cursors based on the results for pagination
next_cursor = Cursor(limit, page + 1, False, page_results.count() > limit)
# If the page is greater than 0, then set the previous cursor
prev_cursor = Cursor(limit, page - 1, True, page > 0)
# Process the results
results = results[:limit]
# Process the results
if self.on_results:
results = self.on_results(results)
# Count the queryset
count = total_count
# Optionally, calculate the total count and max_hits if needed
max_hits = math.ceil(count / limit)
# Return the cursor results
return CursorResult(
results=results,
next=next_cursor,
prev=prev_cursor,
hits=count,
max_hits=max_hits,
)
def process_results(self, results):
raise NotImplementedError
class GroupedOffsetPaginator(OffsetPaginator):
# Field mappers - list m2m fields here
FIELD_MAPPER = {
"labels__id": "label_ids",
"assignees__id": "assignee_ids",
"issue_module__module_id": "module_ids",
}
def __init__(
self,
queryset,
group_by_field_name,
group_by_fields,
count_filter,
total_count_queryset=None,
*args,
**kwargs,
):
# Initiate the parent class for all the parameters
super().__init__(queryset, *args, **kwargs)
# Set the group by field name
self.group_by_field_name = group_by_field_name
# Set the group by fields
self.group_by_fields = group_by_fields
# Set the count filter - this are extra filters that need to be passed to calculate the counts with the filters
self.count_filter = count_filter
def get_result(self, limit=50, cursor=None):
# offset is page #
# value is page limit
if cursor is None:
cursor = Cursor(0, 0, 0)
limit = min(limit, self.max_limit)
# Adjust the initial offset and stop based on the cursor and limit
queryset = self.queryset
page = cursor.offset
offset = cursor.offset * cursor.value
stop = offset + (cursor.value or limit) + 1
# Check if the offset is greater than the max offset
if self.max_offset is not None and offset >= self.max_offset:
raise BadPaginationError("Pagination offset too large")
# Check if the offset is less than 0
if offset < 0:
raise BadPaginationError("Pagination offset cannot be negative")
# Compute the results
results = {}
# Create window for all the groups
queryset = queryset.annotate(
row_number=Window(
expression=RowNumber(),
partition_by=[F(self.group_by_field_name)],
order_by=(
(
F(*self.key).desc(
nulls_last=True
) # order by desc if desc is set
if self.desc
else F(*self.key).asc(nulls_last=True) # Order by asc if set
),
F("created_at").desc(),
),
)
)
# Filter the results by row number
results = queryset.filter(row_number__gt=offset, row_number__lt=stop).order_by(
(
F(*self.key).desc(nulls_last=True)
if self.desc
else F(*self.key).asc(nulls_last=True)
),
F("created_at").desc(),
)
# Adjust cursors based on the grouped results for pagination
next_cursor = Cursor(
limit, page + 1, False, queryset.filter(row_number__gte=stop).exists()
)
# Add previous cursors
prev_cursor = Cursor(limit, page - 1, True, page > 0)
# Count the queryset
count = queryset.count()
# Optionally, calculate the total count and max_hits if needed
# This might require adjustments based on specific use cases
if results:
max_hits = math.ceil(
queryset.values(self.group_by_field_name)
.annotate(count=Count("id", filter=self.count_filter, distinct=True))
.order_by("-count")[0]["count"]
/ limit
)
else:
max_hits = 0
return CursorResult(
results=results,
next=next_cursor,
prev=prev_cursor,
hits=count,
max_hits=max_hits,
)
def __get_total_queryset(self):
# Get total items for each group
return (
self.queryset.values(self.group_by_field_name)
.annotate(count=Count("id", filter=self.count_filter, distinct=True))
.order_by()
)
def __get_total_dict(self):
# Convert the total into dictionary of keys as group name and value as the total
total_group_dict = {}
for group in self.__get_total_queryset():
total_group_dict[str(group.get(self.group_by_field_name))] = (
total_group_dict.get(str(group.get(self.group_by_field_name)), 0)
+ (1 if group.get("count") == 0 else group.get("count"))
)
return total_group_dict
def __get_field_dict(self):
# Create a field dictionary
total_group_dict = self.__get_total_dict()
return {
str(field): {
"results": [],
"total_results": total_group_dict.get(str(field), 0),
}
for field in self.group_by_fields
}
def __result_already_added(self, result, group):
# Check if the result is already added then add it
for existing_issue in group:
if existing_issue["id"] == result["id"]:
return True
return False
def __query_multi_grouper(self, results):
# Grouping for m2m values
total_group_dict = self.__get_total_dict()
# Preparing a dict to keep track of group IDs associated with each entity ID
result_group_mapping = defaultdict(set)
# Preparing a dict to group result by group ID
grouped_by_field_name = defaultdict(list)
# Iterate over results to fill the above dictionaries
for result in results:
result_id = result["id"]
group_id = result[self.group_by_field_name]
result_group_mapping[str(result_id)].add(str(group_id))
# Adding group_ids key to each issue and grouping by group_name
for result in results:
result_id = result["id"]
group_ids = list(result_group_mapping[str(result_id)])
result[self.FIELD_MAPPER.get(self.group_by_field_name)] = (
[] if "None" in group_ids else group_ids
)
# If a result belongs to multiple groups, add it to each group
for group_id in group_ids:
if not self.__result_already_added(
result, grouped_by_field_name[group_id]
):
grouped_by_field_name[group_id].append(result)
# Convert grouped_by_field_name back to a list for each group
processed_results = {
str(group_id): {
"results": issues,
"total_results": total_group_dict.get(str(group_id)),
}
for group_id, issues in grouped_by_field_name.items()
}
return processed_results
def __query_grouper(self, results):
# Grouping for values that are not m2m
processed_results = self.__get_field_dict()
for result in results:
group_value = str(result.get(self.group_by_field_name))
if group_value in processed_results:
processed_results[str(group_value)]["results"].append(result)
return processed_results
def process_results(self, results):
# Process results
if results:
if self.group_by_field_name in self.FIELD_MAPPER:
processed_results = self.__query_multi_grouper(results=results)
else:
processed_results = self.__query_grouper(results=results)
else:
processed_results = {}
return processed_results
class SubGroupedOffsetPaginator(OffsetPaginator):
# Field mappers this are the fields that are m2m
FIELD_MAPPER = {
"labels__id": "label_ids",
"assignees__id": "assignee_ids",
"issue_module__module_id": "module_ids",
}
def __init__(
self,
queryset,
group_by_field_name,
sub_group_by_field_name,
group_by_fields,
sub_group_by_fields,
count_filter,
total_count_queryset=None,
*args,
**kwargs,
):
# Initiate the parent class for all the parameters
super().__init__(queryset, *args, **kwargs)
# Set the group by field name
self.group_by_field_name = group_by_field_name
self.group_by_fields = group_by_fields
# Set the sub group by field name
self.sub_group_by_field_name = sub_group_by_field_name
self.sub_group_by_fields = sub_group_by_fields
# Set the count filter - this are extra filters that need to be passed to calculate the counts with the filters
self.count_filter = count_filter
def get_result(self, limit=30, cursor=None):
# offset is page #
# value is page limit
if cursor is None:
cursor = Cursor(0, 0, 0)
# get the minimum value
limit = min(limit, self.max_limit)
# Adjust the initial offset and stop based on the cursor and limit
queryset = self.queryset
# the current page
page = cursor.offset
# the offset
offset = cursor.offset * cursor.value
# the stop
stop = offset + (cursor.value or limit) + 1
if self.max_offset is not None and offset >= self.max_offset:
raise BadPaginationError("Pagination offset too large")
if offset < 0:
raise BadPaginationError("Pagination offset cannot be negative")
# Compute the results
results = {}
# Create windows for group and sub group field name
queryset = queryset.annotate(
row_number=Window(
expression=RowNumber(),
partition_by=[
F(self.group_by_field_name),
F(self.sub_group_by_field_name),
],
order_by=(
(
F(*self.key).desc(nulls_last=True)
if self.desc
else F(*self.key).asc(nulls_last=True)
),
"-created_at",
),
)
)
# Filter the results
results = queryset.filter(row_number__gt=offset, row_number__lt=stop).order_by(
(
F(*self.key).desc(nulls_last=True)
if self.desc
else F(*self.key).asc(nulls_last=True)
),
F("created_at").desc(),
)
# Adjust cursors based on the grouped results for pagination
next_cursor = Cursor(
limit, page + 1, False, queryset.filter(row_number__gte=stop).exists()
)
# Add previous cursors
prev_cursor = Cursor(limit, page - 1, True, page > 0)
# Count the queryset
count = queryset.count()
# Optionally, calculate the total count and max_hits if needed
# This might require adjustments based on specific use cases
if results:
max_hits = math.ceil(
queryset.values(self.group_by_field_name)
.annotate(count=Count("id", filter=self.count_filter, distinct=True))
.order_by("-count")[0]["count"]
/ limit
)
else:
max_hits = 0
return CursorResult(
results=results,
next=next_cursor,
prev=prev_cursor,
hits=count,
max_hits=max_hits,
)
def __get_group_total_queryset(self):
# Get group totals
return (
self.queryset.order_by(self.group_by_field_name)
.values(self.group_by_field_name)
.annotate(count=Count("id", filter=self.count_filter, distinct=True))
.distinct()
)
def __get_subgroup_total_queryset(self):
# Get subgroup totals
return (
self.queryset.values(self.group_by_field_name, self.sub_group_by_field_name)
.annotate(count=Count("id", filter=self.count_filter, distinct=True))
.order_by()
.values(self.group_by_field_name, self.sub_group_by_field_name, "count")
)
def __get_total_dict(self):
# Use the above to convert to dictionary of 2D objects
total_group_dict = {}
total_sub_group_dict = {}
for group in self.__get_group_total_queryset():
total_group_dict[str(group.get(self.group_by_field_name))] = (
total_group_dict.get(str(group.get(self.group_by_field_name)), 0)
+ (1 if group.get("count") == 0 else group.get("count"))
)
# Sub group total values
for item in self.__get_subgroup_total_queryset():
group = str(item[self.group_by_field_name])
subgroup = str(item[self.sub_group_by_field_name])
count = item["count"]
# Create a dictionary of group and sub group
if group not in total_sub_group_dict:
total_sub_group_dict[str(group)] = {}
# Create a dictionary of sub group
if subgroup not in total_sub_group_dict[group]:
total_sub_group_dict[str(group)][str(subgroup)] = {}
# Create a nested dictionary of group and sub group
total_sub_group_dict[group][subgroup] = count
return total_group_dict, total_sub_group_dict
def __get_field_dict(self):
# Create a field dictionary
total_group_dict, total_sub_group_dict = self.__get_total_dict()
# Create a dictionary of group and sub group
return {
str(group): {
"results": {
str(sub_group): {
"results": [],
"total_results": total_sub_group_dict.get(str(group)).get(
str(sub_group), 0
),
}
for sub_group in total_sub_group_dict.get(str(group), [])
},
"total_results": total_group_dict.get(str(group), 0),
}
for group in self.group_by_fields
}
def __query_multi_grouper(self, results):
# Multi grouper
processed_results = self.__get_field_dict()
# Preparing a dict to keep track of group IDs associated with each label ID
result_group_mapping = defaultdict(set)
result_sub_group_mapping = defaultdict(set)
# Iterate over results to fill the above dictionaries
if self.group_by_field_name in self.FIELD_MAPPER:
for result in results:
result_id = result["id"]
group_id = result[self.group_by_field_name]
result_group_mapping[str(result_id)].add(str(group_id))
# Use the same calculation for the sub group
if self.sub_group_by_field_name in self.FIELD_MAPPER:
for result in results:
result_id = result["id"]
sub_group_id = result[self.sub_group_by_field_name]
result_sub_group_mapping[str(result_id)].add(str(sub_group_id))
# Iterate over results
for result in results:
# Get the group value
group_value = str(result.get(self.group_by_field_name))
# Get the sub group value
sub_group_value = str(result.get(self.sub_group_by_field_name))
# Check if the group value is in the processed results
result_id = result["id"]
if (
group_value in processed_results
and sub_group_value in processed_results[str(group_value)]["results"]
):
if self.group_by_field_name in self.FIELD_MAPPER:
# for multi grouper
group_ids = list(result_group_mapping[str(result_id)])
result[self.FIELD_MAPPER.get(self.group_by_field_name)] = (
[] if "None" in group_ids else group_ids
)
if self.sub_group_by_field_name in self.FIELD_MAPPER:
sub_group_ids = list(result_sub_group_mapping[str(result_id)])
# for multi groups
result[self.FIELD_MAPPER.get(self.sub_group_by_field_name)] = (
[] if "None" in sub_group_ids else sub_group_ids
)
# If a result belongs to multiple groups, add it to each group
processed_results[str(group_value)]["results"][str(sub_group_value)][
"results"
].append(result)
return processed_results
def __query_grouper(self, results):
# Single grouper
processed_results = self.__get_field_dict()
for result in results:
group_value = str(result.get(self.group_by_field_name))
sub_group_value = str(result.get(self.sub_group_by_field_name))
processed_results[group_value]["results"][sub_group_value][
"results"
].append(result)
return processed_results
def process_results(self, results):
if results:
if (
self.group_by_field_name in self.FIELD_MAPPER
or self.sub_group_by_field_name in self.FIELD_MAPPER
):
# if the grouping is done through m2m then
processed_results = self.__query_multi_grouper(results=results)
else:
# group it directly
processed_results = self.__query_grouper(results=results)
else:
processed_results = {}
return processed_results
class BasePaginator:
"""BasePaginator class can be inherited by any View to return a paginated view"""
# cursor query parameter name
cursor_name = "cursor"
# get the per page parameter from request
def get_per_page(self, request, default_per_page=1000, max_per_page=1000):
try:
per_page = int(request.GET.get("per_page", default_per_page))
except ValueError:
raise ParseError(detail="Invalid per_page parameter.")
max_per_page = max(max_per_page, default_per_page)
if per_page > max_per_page:
raise ParseError(
detail=f"Invalid per_page value. Cannot exceed {max_per_page}."
)
return per_page
def paginate(
self,
request,
on_results=None,
paginator=None,
paginator_cls=OffsetPaginator,
default_per_page=1000,
max_per_page=1000,
cursor_cls=Cursor,
extra_stats=None,
controller=None,
group_by_field_name=None,
group_by_fields=None,
sub_group_by_field_name=None,
sub_group_by_fields=None,
count_filter=None,
total_count_queryset=None,
**paginator_kwargs,
):
"""Paginate the request"""
per_page = self.get_per_page(request, default_per_page, max_per_page)
# Convert the cursor value to integer and float from string
input_cursor = None
try:
input_cursor = cursor_cls.from_string(
request.GET.get(self.cursor_name, f"{per_page}:0:0")
)
except ValueError:
raise ParseError(detail="Invalid cursor parameter.")
if not paginator:
if group_by_field_name:
paginator_kwargs["group_by_field_name"] = group_by_field_name
paginator_kwargs["group_by_fields"] = group_by_fields
paginator_kwargs["count_filter"] = count_filter
if sub_group_by_field_name:
paginator_kwargs["sub_group_by_field_name"] = (
sub_group_by_field_name
)
paginator_kwargs["sub_group_by_fields"] = sub_group_by_fields
paginator_kwargs["total_count_queryset"] = total_count_queryset
paginator = paginator_cls(**paginator_kwargs)
try:
cursor_result = paginator.get_result(limit=per_page, cursor=input_cursor)
except BadPaginationError:
raise ParseError(detail="Error in parsing")
if on_results:
results = on_results(cursor_result.results)
else:
results = cursor_result.results
if group_by_field_name:
results = paginator.process_results(results=results)
# Add Manipulation functions to the response
if controller is not None:
results = controller(results)
else:
results = results
# Return the response
response = Response(
{
"grouped_by": group_by_field_name,
"sub_grouped_by": sub_group_by_field_name,
"total_count": (cursor_result.hits),
"next_cursor": str(cursor_result.next),
"prev_cursor": str(cursor_result.prev),
"next_page_results": cursor_result.next.has_results,
"prev_page_results": cursor_result.prev.has_results,
"count": cursor_result.__len__(),
"total_pages": cursor_result.max_hits,
"total_results": cursor_result.hits,
"extra_stats": extra_stats,
"results": results,
}
)
return response

View file

@ -0,0 +1,21 @@
# Python imports
from urllib.parse import urlparse
def validate_next_path(next_path: str) -> str:
"""Validates that next_path is a valid path and extracts only the path component."""
parsed_url = urlparse(next_path)
# Ensure next_path is not an absolute URL
if parsed_url.scheme or parsed_url.netloc:
next_path = parsed_url.path # Extract only the path component
# Ensure it starts with a forward slash (indicating a valid relative path)
if not next_path.startswith("/"):
return ""
# Ensure it does not contain dangerous path traversal sequences
if ".." in next_path:
return ""
return next_path

View file

@ -0,0 +1,58 @@
# Python imports
import os
import atexit
# Third party imports
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.django import DjangoInstrumentor
# Global variable to track initialization
_TRACER_PROVIDER = None
def init_tracer():
"""Initialize OpenTelemetry with proper shutdown handling"""
global _TRACER_PROVIDER
# If already initialized, return existing provider
if _TRACER_PROVIDER is not None:
return _TRACER_PROVIDER
# Configure the tracer provider
service_name = os.environ.get("SERVICE_NAME", "plane-ce-api")
resource = Resource.create({"service.name": service_name})
tracer_provider = TracerProvider(resource=resource)
# Set as global tracer provider
trace.set_tracer_provider(tracer_provider)
# Configure the OTLP exporter
otel_endpoint = os.environ.get("OTLP_ENDPOINT", "https://telemetry.plane.so")
otlp_exporter = OTLPSpanExporter(endpoint=otel_endpoint)
span_processor = BatchSpanProcessor(otlp_exporter)
tracer_provider.add_span_processor(span_processor)
# Initialize Django instrumentation
DjangoInstrumentor().instrument()
# Store provider globally
_TRACER_PROVIDER = tracer_provider
# Register shutdown handler
atexit.register(shutdown_tracer)
return tracer_provider
def shutdown_tracer():
"""Shutdown OpenTelemetry tracers and processors"""
global _TRACER_PROVIDER
if _TRACER_PROVIDER is not None:
if hasattr(_TRACER_PROVIDER, "shutdown"):
_TRACER_PROVIDER.shutdown()
_TRACER_PROVIDER = None

View file

@ -0,0 +1,121 @@
# Python imports
import pytz
from datetime import datetime, time
from datetime import timedelta
# Django imports
from django.utils import timezone
# Module imports
from plane.db.models import Project
def user_timezone_converter(queryset, datetime_fields, user_timezone):
# Create a timezone object for the user's timezone
user_tz = pytz.timezone(user_timezone)
# Check if queryset is a dictionary (single item) or a list of dictionaries
if isinstance(queryset, dict):
queryset_values = [queryset]
else:
queryset_values = list(queryset)
# Iterate over the dictionaries in the list
for item in queryset_values:
# Iterate over the datetime fields
for field in datetime_fields:
# Convert the datetime field to the user's timezone
if field in item and item[field]:
item[field] = item[field].astimezone(user_tz)
# If queryset was a single item, return a single item
if isinstance(queryset, dict):
return queryset_values[0]
else:
return queryset_values
def convert_to_utc(date, project_id, is_start_date=False):
"""
Converts a start date string to the project's local timezone at 12:00 AM
and then converts it to UTC for storage.
Args:
date (str): The date string in "YYYY-MM-DD" format.
project_id (int): The project's ID to fetch the associated timezone.
Returns:
datetime: The UTC datetime.
"""
# Retrieve the project's timezone using the project ID
project = Project.objects.get(id=project_id)
project_timezone = project.timezone
if not date or not project_timezone:
raise ValueError("Both date and timezone must be provided.")
# Parse the string into a date object
start_date = datetime.strptime(date, "%Y-%m-%d").date()
# Get the project's timezone
local_tz = pytz.timezone(project_timezone)
# Combine the date with 12:00 AM time
local_datetime = datetime.combine(start_date, time.min)
# Localize the datetime to the project's timezone
localized_datetime = local_tz.localize(local_datetime)
# If it's an start date, add one minute
if is_start_date:
localized_datetime += timedelta(minutes=0, seconds=1)
# Convert the localized datetime to UTC
utc_datetime = localized_datetime.astimezone(pytz.utc)
current_datetime_in_project_tz = timezone.now().astimezone(local_tz)
current_datetime_in_utc = current_datetime_in_project_tz.astimezone(pytz.utc)
if localized_datetime.date() == current_datetime_in_project_tz.date():
return current_datetime_in_utc
return utc_datetime
else:
# the cycle end date is the last minute of the day
localized_datetime += timedelta(hours=23, minutes=59, seconds=0)
# Convert the localized datetime to UTC
utc_datetime = localized_datetime.astimezone(pytz.utc)
# Return the UTC datetime for storage
return utc_datetime
def convert_utc_to_project_timezone(utc_datetime, project_id):
"""
Converts a UTC datetime (stored in the database) to the project's local timezone.
Args:
utc_datetime (datetime): The UTC datetime to be converted.
project_id (int): The project's ID to fetch the associated timezone.
Returns:
datetime: The datetime in the project's local timezone.
"""
# Retrieve the project's timezone using the project ID
project = Project.objects.get(id=project_id)
project_timezone = project.timezone
if not project_timezone:
raise ValueError("Project timezone must be provided.")
# Get the timezone object for the project's timezone
local_tz = pytz.timezone(project_timezone)
# Convert the UTC datetime to the project's local timezone
if utc_datetime.tzinfo is None:
# Localize UTC datetime if it's naive (i.e., without timezone info)
utc_datetime = pytz.utc.localize(utc_datetime)
# Convert to the project's local timezone
local_datetime = utc_datetime.astimezone(local_tz)
return local_datetime

View file

@ -0,0 +1,87 @@
# Python imports
import re
from typing import Optional
from urllib.parse import urlparse, urlunparse
def contains_url(value: str) -> bool:
"""
Check if the value contains a URL.
"""
url_pattern = re.compile(r"https?://|www\\.")
return bool(url_pattern.search(value))
def is_valid_url(url: str) -> bool:
"""
Validates whether the given string is a well-formed URL.
Args:
url (str): The URL string to validate.
Returns:
bool: True if the URL is valid, False otherwise.
Example:
>>> is_valid_url("https://example.com")
True
>>> is_valid_url("not a url")
False
"""
try:
result = urlparse(url)
# A valid URL should have at least scheme and netloc
return all([result.scheme, result.netloc])
except TypeError:
return False
def get_url_components(url: str) -> Optional[dict]:
"""
Parses the URL and returns its components if valid.
Args:
url (str): The URL string to parse.
Returns:
Optional[dict]: A dictionary with URL components if valid, None otherwise.
Example:
>>> get_url_components("https://example.com/path?query=1")
{'scheme': 'https', 'netloc': 'example.com', 'path': '/path', 'params': '', 'query': 'query=1', 'fragment': ''}
"""
if not is_valid_url(url):
return None
result = urlparse(url)
return {
"scheme": result.scheme,
"netloc": result.netloc,
"path": result.path,
"params": result.params,
"query": result.query,
"fragment": result.fragment,
}
def normalize_url_path(url: str) -> str:
"""
Normalize the path component of a URL by replacing multiple consecutive slashes with a single slash.
This function preserves the protocol, domain, query parameters, and fragments of the URL,
only modifying the path portion to ensure there are no duplicate slashes.
Args:
url (str): The input URL string to normalize.
Returns:
str: The normalized URL with redundant slashes in the path removed.
Example:
>>> normalize_url_path('https://example.com//foo///bar//baz?x=1#frag')
'https://example.com/foo/bar/baz?x=1#frag'
"""
parts = urlparse(url)
# Normalize the path
normalized_path = re.sub(r"/+", "/", parts.path)
# Reconstruct the URL
return urlunparse(parts._replace(path=normalized_path))

View file

@ -0,0 +1,22 @@
# Python imports
import uuid
import hashlib
def is_valid_uuid(uuid_str):
"""Check if a string is a valid UUID version 4"""
try:
uuid_obj = uuid.UUID(uuid_str)
return uuid_obj.version == 4
except ValueError:
return False
def convert_uuid_to_integer(uuid_val: uuid.UUID) -> int:
"""Convert a UUID to a 64-bit signed integer"""
# Ensure UUID is a string
uuid_value: str = str(uuid_val)
# Hash to 64-bit signed int
h: bytes = hashlib.sha256(uuid_value.encode()).digest()
bigint: int = int.from_bytes(h[:8], byteorder="big", signed=True)
return bigint