[WEB-3782] chore: analytics endpoints (#6973)

* chore: analytics endpoint

* chore: created analytics chart

* chore: validation errors

* chore: added a new graph in advance analytics

* chore: added csv exporter

* chore: updated the filtering logic for analytics

* chore: opitmised the analytics endpoint

* chore: updated the base function for viewsets

* chore: updated the export logic

* chore: added type hints

* chore: added type hints
This commit is contained in:
Bavisetti Narayan 2025-05-12 13:15:17 +05:30 committed by GitHub
parent 13c46e0fdf
commit b435ceedfc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 859 additions and 0 deletions

View file

@ -6,8 +6,12 @@ from plane.app.views import (
AnalyticViewViewset,
SavedAnalyticEndpoint,
ExportAnalyticsEndpoint,
AdvanceAnalyticsEndpoint,
AdvanceAnalyticsStatsEndpoint,
AdvanceAnalyticsChartEndpoint,
DefaultAnalyticsEndpoint,
ProjectStatsEndpoint,
AdvanceAnalyticsExportEndpoint,
)
@ -49,4 +53,24 @@ urlpatterns = [
ProjectStatsEndpoint.as_view(),
name="project-analytics",
),
path(
"workspaces/<str:slug>/advance-analytics/",
AdvanceAnalyticsEndpoint.as_view(),
name="advance-analytics",
),
path(
"workspaces/<str:slug>/advance-analytics-stats/",
AdvanceAnalyticsStatsEndpoint.as_view(),
name="advance-analytics-stats",
),
path(
"workspaces/<str:slug>/advance-analytics-charts/",
AdvanceAnalyticsChartEndpoint.as_view(),
name="advance-analytics-chart",
),
path(
"workspaces/<str:slug>/advance-analytics-export/",
AdvanceAnalyticsExportEndpoint.as_view(),
name="advance-analytics-export",
),
]

View file

@ -199,6 +199,13 @@ from .analytic.base import (
ProjectStatsEndpoint,
)
from .analytic.advance import (
AdvanceAnalyticsEndpoint,
AdvanceAnalyticsStatsEndpoint,
AdvanceAnalyticsChartEndpoint,
AdvanceAnalyticsExportEndpoint,
)
from .notification.base import (
NotificationViewSet,
UnreadNotificationEndpoint,

View file

@ -0,0 +1,397 @@
from rest_framework.response import Response
from rest_framework import status
from typing import Dict, List, Any
from datetime import timedelta
from django.db.models import QuerySet, Q, Count
from django.http import HttpRequest
from plane.app.views.base import BaseAPIView
from plane.app.permissions import ROLE, allow_permission
from plane.db.models import (
WorkspaceMember,
Project,
Issue,
Cycle,
Module,
IssueView,
ProjectPage,
)
from plane.utils.build_chart import build_analytics_chart
from plane.bgtasks.analytic_plot_export import export_analytics_to_csv_email
from plane.utils.date_utils import get_analytics_filters
class AdvanceAnalyticsBaseView(BaseAPIView):
def initialize_workspace(self, slug: str, type: str) -> None:
self._workspace_slug = slug
self.filters = get_analytics_filters(
slug=slug,
type=type,
user=self.request.user,
date_filter=self.request.GET.get("date_filter", None),
project_ids=self.request.GET.get("project_ids", None),
)
class AdvanceAnalyticsEndpoint(AdvanceAnalyticsBaseView):
def get_filtered_counts(self, queryset: QuerySet) -> Dict[str, int]:
def get_filtered_count() -> int:
if self.filters["analytics_date_range"]:
return queryset.filter(
created_at__gte=self.filters["analytics_date_range"]["current"][
"gte"
],
created_at__lte=self.filters["analytics_date_range"]["current"][
"lte"
],
).count()
return queryset.count()
def get_previous_count() -> int:
if self.filters["analytics_date_range"] and self.filters[
"analytics_date_range"
].get("previous"):
return queryset.filter(
created_at__gte=self.filters["analytics_date_range"]["previous"][
"gte"
],
created_at__lte=self.filters["analytics_date_range"]["previous"][
"lte"
],
).count()
return 0
return {
"count": get_filtered_count(),
"filter_count": get_previous_count(),
}
def get_overview_data(self) -> Dict[str, Dict[str, int]]:
return {
"total_users": self.get_filtered_counts(
WorkspaceMember.objects.filter(
workspace__slug=self._workspace_slug, is_active=True
)
),
"total_admins": self.get_filtered_counts(
WorkspaceMember.objects.filter(
workspace__slug=self._workspace_slug,
role=ROLE.ADMIN.value,
is_active=True,
)
),
"total_members": self.get_filtered_counts(
WorkspaceMember.objects.filter(
workspace__slug=self._workspace_slug,
role=ROLE.MEMBER.value,
is_active=True,
)
),
"total_guests": self.get_filtered_counts(
WorkspaceMember.objects.filter(
workspace__slug=self._workspace_slug,
role=ROLE.GUEST.value,
is_active=True,
)
),
"total_projects": self.get_filtered_counts(
Project.objects.filter(**self.filters["project_filters"])
),
"total_work_items": self.get_filtered_counts(
Issue.issue_objects.filter(**self.filters["base_filters"])
),
"total_cycles": self.get_filtered_counts(
Cycle.objects.filter(**self.filters["base_filters"])
),
"total_intake": self.get_filtered_counts(
Issue.objects.filter(**self.filters["base_filters"]).filter(
issue_intake__isnull=False
)
),
}
def get_work_items_stats(self) -> Dict[str, Dict[str, int]]:
base_queryset = Issue.objects.filter(**self.filters["base_filters"])
return {
"total_work_items": self.get_filtered_counts(base_queryset),
"started_work_items": self.get_filtered_counts(
base_queryset.filter(state__group="started")
),
"backlog_work_items": self.get_filtered_counts(
base_queryset.filter(state__group="backlog")
),
"un_started_work_items": self.get_filtered_counts(
base_queryset.filter(state__group="unstarted")
),
"completed_work_items": self.get_filtered_counts(
base_queryset.filter(state__group="completed")
),
}
@allow_permission([ROLE.ADMIN, ROLE.MEMBER], level="WORKSPACE")
def get(self, request: HttpRequest, slug: str) -> Response:
self.initialize_workspace(slug, type="analytics")
tab = request.GET.get("tab", "overview")
if tab == "overview":
return Response(
self.get_overview_data(),
status=status.HTTP_200_OK,
)
elif tab == "work-items":
return Response(
self.get_work_items_stats(),
status=status.HTTP_200_OK,
)
return Response({"message": "Invalid tab"}, status=status.HTTP_400_BAD_REQUEST)
class AdvanceAnalyticsStatsEndpoint(AdvanceAnalyticsBaseView):
def get_project_issues_stats(self) -> QuerySet:
# Get the base queryset with workspace and project filters
base_queryset = Issue.issue_objects.filter(**self.filters["base_filters"])
# Apply date range filter if available
if self.filters["chart_period_range"]:
start_date, end_date = self.filters["chart_period_range"]
base_queryset = base_queryset.filter(
created_at__date__gte=start_date, created_at__date__lte=end_date
)
return (
base_queryset.values("project_id", "project__name")
.annotate(
cancelled_work_items=Count("id", filter=Q(state__group="cancelled")),
completed_work_items=Count("id", filter=Q(state__group="completed")),
backlog_work_items=Count("id", filter=Q(state__group="backlog")),
un_started_work_items=Count("id", filter=Q(state__group="unstarted")),
started_work_items=Count("id", filter=Q(state__group="started")),
)
.order_by("project_id")
)
@allow_permission([ROLE.ADMIN, ROLE.MEMBER], level="WORKSPACE")
def get(self, request: HttpRequest, slug: str) -> Response:
self.initialize_workspace(slug, type="chart")
type = request.GET.get("type", "work-items")
if type == "work-items":
return Response(
self.get_project_issues_stats(),
status=status.HTTP_200_OK,
)
return Response({"message": "Invalid type"}, status=status.HTTP_400_BAD_REQUEST)
class AdvanceAnalyticsChartEndpoint(AdvanceAnalyticsBaseView):
def project_chart(self) -> List[Dict[str, Any]]:
# Get the base queryset with workspace and project filters
base_queryset = Issue.issue_objects.filter(**self.filters["base_filters"])
date_filter = {}
# Apply date range filter if available
if self.filters["chart_period_range"]:
start_date, end_date = self.filters["chart_period_range"]
date_filter = {
"created_at__date__gte": start_date,
"created_at__date__lte": end_date,
}
total_work_items = base_queryset.filter(**date_filter).count()
total_cycles = Cycle.objects.filter(
**self.filters["base_filters"], **date_filter
).count()
total_modules = Module.objects.filter(
**self.filters["base_filters"], **date_filter
).count()
total_intake = Issue.objects.filter(
issue_intake__isnull=False, **self.filters["base_filters"], **date_filter
).count()
total_members = WorkspaceMember.objects.filter(
workspace__slug=self._workspace_slug, is_active=True, **date_filter
).count()
total_pages = ProjectPage.objects.filter(
**self.filters["base_filters"], **date_filter
).count()
total_views = IssueView.objects.filter(
**self.filters["base_filters"], **date_filter
).count()
data = {
"work_items": total_work_items,
"cycles": total_cycles,
"modules": total_modules,
"intake": total_intake,
"members": total_members,
"pages": total_pages,
"views": total_views,
}
return [
{
"key": key,
"name": key.replace("_", " ").title(),
"count": value or 0,
}
for key, value in data.items()
]
def work_item_completion_chart(self) -> Dict[str, Any]:
# Get the base queryset
queryset = (
Issue.issue_objects.filter(**self.filters["base_filters"])
.select_related("workspace", "state", "parent")
.prefetch_related(
"assignees", "labels", "issue_module__module", "issue_cycle__cycle"
)
)
# Apply date range filter if available
if self.filters["chart_period_range"]:
start_date, end_date = self.filters["chart_period_range"]
queryset = queryset.filter(
created_at__date__gte=start_date, created_at__date__lte=end_date
)
# Get daily stats with optimized query
daily_stats = (
queryset.values("created_at__date")
.annotate(
created_count=Count("id"),
completed_count=Count("id", filter=Q(completed_at__isnull=False)),
)
.order_by("created_at__date")
)
# Create a dictionary of existing stats with summed counts
stats_dict = {
stat["created_at__date"].strftime("%Y-%m-%d"): {
"created_count": stat["created_count"],
"completed_count": stat["completed_count"],
}
for stat in daily_stats
}
# Generate data for all days in the range
data = []
current_date = start_date
while current_date <= end_date:
date_str = current_date.strftime("%Y-%m-%d")
stats = stats_dict.get(date_str, {"created_count": 0, "completed_count": 0})
data.append(
{
"key": date_str,
"name": date_str,
"count": stats["created_count"] + stats["completed_count"],
"completed_issues": stats["completed_count"],
"created_issues": stats["created_count"],
}
)
current_date += timedelta(days=1)
schema = {
"completed_issues": "completed_issues",
"created_issues": "created_issues",
}
return {"data": data, "schema": schema}
@allow_permission([ROLE.ADMIN, ROLE.MEMBER], level="WORKSPACE")
def get(self, request: HttpRequest, slug: str) -> Response:
self.initialize_workspace(slug, type="chart")
type = request.GET.get("type", "projects")
group_by = request.GET.get("group_by", None)
x_axis = request.GET.get("x_axis", "PRIORITY")
if type == "projects":
return Response(self.project_chart(), status=status.HTTP_200_OK)
elif type == "custom-work-items":
# Get the base queryset
queryset = (
Issue.issue_objects.filter(**self.filters["base_filters"])
.select_related("workspace", "state", "parent")
.prefetch_related(
"assignees", "labels", "issue_module__module", "issue_cycle__cycle"
)
)
# Apply date range filter if available
if self.filters["chart_period_range"]:
start_date, end_date = self.filters["chart_period_range"]
queryset = queryset.filter(
created_at__date__gte=start_date, created_at__date__lte=end_date
)
return Response(
build_analytics_chart(queryset, x_axis, group_by),
status=status.HTTP_200_OK,
)
elif type == "work-items":
return Response(
self.work_item_completion_chart(),
status=status.HTTP_200_OK,
)
return Response({"message": "Invalid type"}, status=status.HTTP_400_BAD_REQUEST)
class AdvanceAnalyticsExportEndpoint(AdvanceAnalyticsBaseView):
@allow_permission([ROLE.ADMIN, ROLE.MEMBER], level="WORKSPACE")
def post(self, request: HttpRequest, slug: str) -> Response:
self.initialize_workspace(slug, type="chart")
queryset = Issue.issue_objects.filter(**self.filters["base_filters"])
# Apply date range filter if available
if self.filters["chart_period_range"]:
start_date, end_date = self.filters["chart_period_range"]
queryset = queryset.filter(
created_at__date__gte=start_date, created_at__date__lte=end_date
)
queryset = (
queryset.values("project_id", "project__name")
.annotate(
cancelled_work_items=Count("id", filter=Q(state__group="cancelled")),
completed_work_items=Count("id", filter=Q(state__group="completed")),
backlog_work_items=Count("id", filter=Q(state__group="backlog")),
un_started_work_items=Count("id", filter=Q(state__group="unstarted")),
started_work_items=Count("id", filter=Q(state__group="started")),
)
.order_by("project_id")
)
# Convert QuerySet to list of dictionaries for serialization
serialized_data = list(queryset)
headers = [
"Projects",
"Completed Issues",
"Backlog Issues",
"Unstarted Issues",
"Started Issues",
]
keys = [
"project__name",
"completed_work_items",
"backlog_work_items",
"un_started_work_items",
"started_work_items",
]
email = request.user.email
# Send serialized data to background task
export_analytics_to_csv_email.delay(serialized_data, headers, keys, email, slug)
return Response(
{
"message": f"Once the export is ready it will be emailed to you at {str(email)}"
},
status=status.HTTP_200_OK,
)

View file

@ -464,3 +464,32 @@ def analytic_export_task(email, data, slug):
except Exception as e:
log_exception(e)
return
@shared_task
def export_analytics_to_csv_email(data, headers, keys, email, slug):
try:
"""
Prepares a CSV from data and sends it as an email attachment.
Parameters:
- data: List of dictionaries (e.g. from .values())
- headers: List of CSV column headers
- keys: Keys to extract from each data item (dict)
- email: Email address to send to
- slug: Used for the filename
"""
# Prepare rows: header + data rows
rows = [headers]
for item in data:
row = [item.get(key, "") for key in keys]
rows.append(row)
# Generate CSV buffer
csv_buffer = generate_csv_from_rows(rows)
# Send email with CSV attachment
send_export_email(email=email, slug=slug, csv_buffer=csv_buffer, rows=rows)
except Exception as e:
log_exception(e)
return

View file

@ -0,0 +1,205 @@
from typing import Dict, Any, Tuple, Optional, List, Union
# Django imports
from django.db.models import (
Count,
F,
QuerySet,
Aggregate,
)
from plane.db.models import Issue
from rest_framework.exceptions import ValidationError
x_axis_mapper = {
"STATES": "STATES",
"STATE_GROUPS": "STATE_GROUPS",
"LABELS": "LABELS",
"ASSIGNEES": "ASSIGNEES",
"ESTIMATE_POINTS": "ESTIMATE_POINTS",
"CYCLES": "CYCLES",
"MODULES": "MODULES",
"PRIORITY": "PRIORITY",
"START_DATE": "START_DATE",
"TARGET_DATE": "TARGET_DATE",
"CREATED_AT": "CREATED_AT",
"COMPLETED_AT": "COMPLETED_AT",
"CREATED_BY": "CREATED_BY",
}
def get_y_axis_filter(y_axis: str) -> Dict[str, Any]:
filter_mapping = {
"WORK_ITEM_COUNT": {"id": F("id")},
}
return filter_mapping.get(y_axis, {})
def get_x_axis_field() -> Dict[str, Tuple[str, str, Optional[Dict[str, Any]]]]:
return {
"STATES": ("state__id", "state__name", None),
"STATE_GROUPS": ("state__group", "state__group", None),
"LABELS": (
"labels__id",
"labels__name",
{"label_issue__deleted_at__isnull": True},
),
"ASSIGNEES": (
"assignees__id",
"assignees__display_name",
{"issue_assignee__deleted_at__isnull": True},
),
"ESTIMATE_POINTS": ("estimate_point__value", "estimate_point__key", None),
"CYCLES": (
"issue_cycle__cycle_id",
"issue_cycle__cycle__name",
{"issue_cycle__deleted_at__isnull": True},
),
"MODULES": (
"issue_module__module_id",
"issue_module__module__name",
{"issue_module__deleted_at__isnull": True},
),
"PRIORITY": ("priority", "priority", None),
"START_DATE": ("start_date", "start_date", None),
"TARGET_DATE": ("target_date", "target_date", None),
"CREATED_AT": ("created_at__date", "created_at__date", None),
"COMPLETED_AT": ("completed_at__date", "completed_at__date", None),
"CREATED_BY": ("created_by_id", "created_by__display_name", None),
}
def process_grouped_data(
data: List[Dict[str, Any]],
) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
response = {}
schema = {}
for item in data:
key = item["key"]
if key not in response:
response[key] = {
"key": key if key else "none",
"name": (
item.get("display_name", key)
if item.get("display_name", key)
else "None"
),
"count": 0,
}
group_key = str(item["group_key"]) if item["group_key"] else "none"
schema[group_key] = item.get("group_name", item["group_key"])
schema[group_key] = schema[group_key] if schema[group_key] else "None"
response[key][group_key] = response[key].get(group_key, 0) + item["count"]
response[key]["count"] += item["count"]
return list(response.values()), schema
def build_number_chart_response(
queryset: QuerySet[Issue],
y_axis_filter: Dict[str, Any],
y_axis: str,
aggregate_func: Aggregate,
) -> List[Dict[str, Any]]:
count = (
queryset.filter(**y_axis_filter).aggregate(total=aggregate_func).get("total", 0)
)
return [{"key": y_axis, "name": y_axis, "count": count}]
def build_grouped_chart_response(
queryset: QuerySet[Issue],
id_field: str,
name_field: str,
group_field: str,
group_name_field: str,
aggregate_func: Aggregate,
) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
data = (
queryset.annotate(
key=F(id_field),
group_key=F(group_field),
group_name=F(group_name_field),
display_name=F(name_field) if name_field else F(id_field),
)
.values("key", "group_key", "group_name", "display_name")
.annotate(count=aggregate_func)
.order_by("-count")
)
return process_grouped_data(data)
def build_simple_chart_response(
queryset: QuerySet, id_field: str, name_field: str, aggregate_func: Aggregate
) -> List[Dict[str, Any]]:
data = (
queryset.annotate(
key=F(id_field), display_name=F(name_field) if name_field else F(id_field)
)
.values("key", "display_name")
.annotate(count=aggregate_func)
.order_by("key")
)
return [
{
"key": item["key"] if item["key"] else "None",
"name": item["display_name"] if item["display_name"] else "None",
"count": item["count"],
}
for item in data
]
def build_analytics_chart(
queryset: QuerySet[Issue],
x_axis: str,
group_by: Optional[str] = None,
date_filter: Optional[str] = None,
) -> Dict[str, Union[List[Dict[str, Any]], Dict[str, str]]]:
# Validate x_axis
if x_axis not in x_axis_mapper:
raise ValidationError(f"Invalid x_axis field: {x_axis}")
# Validate group_by
if group_by and group_by not in x_axis_mapper:
raise ValidationError(f"Invalid group_by field: {group_by}")
field_mapping = get_x_axis_field()
id_field, name_field, additional_filter = field_mapping.get(
x_axis, (None, None, {})
)
group_field, group_name_field, group_additional_filter = field_mapping.get(
group_by, (None, None, {})
)
# Apply additional filters if they exist
if additional_filter or {}:
queryset = queryset.filter(**additional_filter)
if group_additional_filter or {}:
queryset = queryset.filter(**group_additional_filter)
aggregate_func = Count("id", distinct=True)
if group_field:
response, schema = build_grouped_chart_response(
queryset,
id_field,
name_field,
group_field,
group_name_field,
aggregate_func,
)
else:
response = build_simple_chart_response(
queryset, id_field, name_field, aggregate_func
)
schema = {}
return {"data": response, "schema": schema}

View file

@ -0,0 +1,197 @@
from datetime import datetime, timedelta, date
from django.utils import timezone
from typing import Dict, Optional, List, Union, Tuple, Any
from plane.db.models import User
def get_analytics_date_range(
date_filter: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> Optional[Dict[str, Dict[str, datetime]]]:
"""
Get date range for analytics with current and previous periods for comparison.
Returns a dictionary with current and previous date ranges.
Args:
date_filter (str): The type of date filter to apply
start_date (str): Start date for custom range (format: YYYY-MM-DD)
end_date (str): End date for custom range (format: YYYY-MM-DD)
Returns:
dict: Dictionary containing current and previous date ranges
"""
if not date_filter:
return None
today = timezone.now().date()
if date_filter == "yesterday":
yesterday = today - timedelta(days=1)
return {
"current": {
"gte": datetime.combine(yesterday, datetime.min.time()),
"lte": datetime.combine(yesterday, datetime.max.time()),
}
}
elif date_filter == "last_7_days":
return {
"current": {
"gte": datetime.combine(today - timedelta(days=7), datetime.min.time()),
"lte": datetime.combine(today, datetime.max.time()),
},
"previous": {
"gte": datetime.combine(
today - timedelta(days=14), datetime.min.time()
),
"lte": datetime.combine(today - timedelta(days=8), datetime.max.time()),
},
}
elif date_filter == "last_30_days":
return {
"current": {
"gte": datetime.combine(
today - timedelta(days=30), datetime.min.time()
),
"lte": datetime.combine(today, datetime.max.time()),
},
"previous": {
"gte": datetime.combine(
today - timedelta(days=60), datetime.min.time()
),
"lte": datetime.combine(
today - timedelta(days=31), datetime.max.time()
),
},
}
elif date_filter == "last_3_months":
return {
"current": {
"gte": datetime.combine(
today - timedelta(days=90), datetime.min.time()
),
"lte": datetime.combine(today, datetime.max.time()),
},
"previous": {
"gte": datetime.combine(
today - timedelta(days=180), datetime.min.time()
),
"lte": datetime.combine(
today - timedelta(days=91), datetime.max.time()
),
},
}
elif date_filter == "custom" and start_date and end_date:
try:
start = datetime.strptime(start_date, "%Y-%m-%d").date()
end = datetime.strptime(end_date, "%Y-%m-%d").date()
return {
"current": {
"gte": datetime.combine(start, datetime.min.time()),
"lte": datetime.combine(end, datetime.max.time()),
}
}
except (ValueError, TypeError):
return None
return None
def get_chart_period_range(
date_filter: Optional[str] = None,
) -> Optional[Tuple[date, date]]:
"""
Get date range for chart visualization.
Returns a tuple of (start_date, end_date) for the specified period.
Args:
date_filter (str): The type of date filter to apply. Options are:
- "yesterday": Yesterday's date
- "last_7_days": Last 7 days
- "last_30_days": Last 30 days
- "last_3_months": Last 90 days
Defaults to "last_7_days" if not specified or invalid.
Returns:
tuple: A tuple containing (start_date, end_date) as date objects
"""
if not date_filter:
return None
today = timezone.now().date()
period_ranges = {
"yesterday": (
today - timedelta(days=1),
today - timedelta(days=1),
),
"last_7_days": (today - timedelta(days=7), today),
"last_30_days": (today - timedelta(days=30), today),
"last_3_months": (today - timedelta(days=90), today),
}
return period_ranges.get(date_filter, period_ranges["last_7_days"])
def get_analytics_filters(
slug: str,
user: User,
type: str,
date_filter: Optional[str] = None,
project_ids: Optional[Union[str, List[str]]] = None,
) -> Dict[str, Any]:
"""
Get combined project and date filters for analytics endpoints
Args:
slug: The workspace slug
user: The current user
type: The type of filter ("analytics" or "chart")
date_filter: Optional date filter string
project_ids: Optional list of project IDs or comma-separated string of project IDs
Returns:
dict: A dictionary containing:
- base_filters: Base filters for the workspace and user
- project_filters: Project-specific filters
- analytics_date_range: Date range filters for analytics comparison
- chart_period_range: Date range for chart visualization
"""
# Get project IDs from request
if project_ids and isinstance(project_ids, str):
project_ids = [str(project_id) for project_id in project_ids.split(",")]
# Base filters for workspace and user
base_filters = {
"workspace__slug": slug,
"project__project_projectmember__member": user,
"project__project_projectmember__is_active": True,
}
# Project filters
project_filters = {
"workspace__slug": slug,
"project_projectmember__member": user,
"project_projectmember__is_active": True,
}
# Add project IDs to filters if provided
if project_ids:
base_filters["project_id__in"] = project_ids
project_filters["id__in"] = project_ids
# Initialize date range variables
analytics_date_range = None
chart_period_range = None
# Get date range filters based on type
if type == "analytics":
analytics_date_range = get_analytics_date_range(date_filter)
elif type == "chart":
chart_period_range = get_chart_period_range(date_filter)
return {
"base_filters": base_filters,
"project_filters": project_filters,
"analytics_date_range": analytics_date_range,
"chart_period_range": chart_period_range,
}