[WEB-3728] fix: duplicate sequence ids being generated due to race condition (#6877)
* fix: race condition which is creating duplicate sequence ids * chore: add management command to fix duplicate sequences * chore: update command to take a lock and optimize the script to use dict instead of loops * chore: update the script to use transaction
This commit is contained in:
parent
34337f90c1
commit
00f78bd6a1
5 changed files with 139 additions and 16 deletions
|
|
@ -32,7 +32,7 @@ from plane.settings.redis import redis_instance
|
||||||
from plane.utils.exception_logger import log_exception
|
from plane.utils.exception_logger import log_exception
|
||||||
from plane.bgtasks.webhook_task import webhook_activity
|
from plane.bgtasks.webhook_task import webhook_activity
|
||||||
from plane.utils.issue_relation_mapper import get_inverse_relation
|
from plane.utils.issue_relation_mapper import get_inverse_relation
|
||||||
from plane.utils.valid_uuid import is_valid_uuid
|
from plane.utils.uuid import is_valid_uuid
|
||||||
|
|
||||||
|
|
||||||
# Track Changes in name
|
# Track Changes in name
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,102 @@
|
||||||
|
# Django imports
|
||||||
|
from django.core.management.base import BaseCommand, CommandError
|
||||||
|
from django.db.models import Max
|
||||||
|
from django.db import connection, transaction
|
||||||
|
|
||||||
|
# Module imports
|
||||||
|
from plane.db.models import Project, Issue, IssueSequence
|
||||||
|
from plane.utils.uuid import convert_uuid_to_integer
|
||||||
|
|
||||||
|
|
||||||
|
class Command(BaseCommand):
|
||||||
|
help = "Fix duplicate sequences"
|
||||||
|
|
||||||
|
def add_arguments(self, parser):
|
||||||
|
# Positional argument
|
||||||
|
parser.add_argument("issue_identifier", type=str, help="Issue Identifier")
|
||||||
|
|
||||||
|
def strict_str_to_int(self, s):
|
||||||
|
if not s.isdigit() and not (s.startswith("-") and s[1:].isdigit()):
|
||||||
|
raise ValueError("Invalid integer string")
|
||||||
|
return int(s)
|
||||||
|
|
||||||
|
def handle(self, *args, **options):
|
||||||
|
workspace_slug = input("Workspace slug: ")
|
||||||
|
|
||||||
|
if not workspace_slug:
|
||||||
|
raise CommandError("Workspace slug is required")
|
||||||
|
|
||||||
|
issue_identifier = options.get("issue_identifier", False)
|
||||||
|
|
||||||
|
# Validate issue_identifier
|
||||||
|
if not issue_identifier:
|
||||||
|
raise CommandError("Issue identifier is required")
|
||||||
|
|
||||||
|
# Validate issue identifier
|
||||||
|
try:
|
||||||
|
identifier = issue_identifier.split("-")
|
||||||
|
|
||||||
|
if len(identifier) != 2:
|
||||||
|
raise ValueError("Invalid issue identifier format")
|
||||||
|
|
||||||
|
project_identifier = identifier[0]
|
||||||
|
issue_sequence = self.strict_str_to_int(identifier[1])
|
||||||
|
|
||||||
|
# Fetch the project
|
||||||
|
project = Project.objects.get(
|
||||||
|
identifier__iexact=project_identifier, workspace__slug=workspace_slug
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get the issues
|
||||||
|
issues = Issue.objects.filter(project=project, sequence_id=issue_sequence)
|
||||||
|
# Check if there are duplicate issues
|
||||||
|
if not issues.count() > 1:
|
||||||
|
raise CommandError(
|
||||||
|
"No duplicate issues found with the given identifier"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.stdout.write(
|
||||||
|
self.style.SUCCESS(
|
||||||
|
f"{issues.count()} issues found with identifier {issue_identifier}"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
with transaction.atomic():
|
||||||
|
# This ensures only one transaction per project can execute this code at a time
|
||||||
|
lock_key = convert_uuid_to_integer(project.id)
|
||||||
|
|
||||||
|
# Acquire an exclusive lock using the project ID as the lock key
|
||||||
|
with connection.cursor() as cursor:
|
||||||
|
# Get an exclusive lock using the project ID as the lock key
|
||||||
|
cursor.execute("SELECT pg_advisory_xact_lock(%s)", [lock_key])
|
||||||
|
|
||||||
|
# Get the maximum sequence ID for the project
|
||||||
|
last_sequence = IssueSequence.objects.filter(project=project).aggregate(
|
||||||
|
largest=Max("sequence")
|
||||||
|
)["largest"]
|
||||||
|
|
||||||
|
bulk_issues = []
|
||||||
|
bulk_issue_sequences = []
|
||||||
|
|
||||||
|
issue_sequence_map = {
|
||||||
|
isq.issue_id: isq
|
||||||
|
for isq in IssueSequence.objects.filter(project=project)
|
||||||
|
}
|
||||||
|
|
||||||
|
# change the ids of duplicate issues
|
||||||
|
for index, issue in enumerate(issues[1:]):
|
||||||
|
updated_sequence_id = last_sequence + index + 1
|
||||||
|
issue.sequence_id = updated_sequence_id
|
||||||
|
bulk_issues.append(issue)
|
||||||
|
|
||||||
|
# Find the same issue sequence instance from the above queryset
|
||||||
|
sequence_identifier = issue_sequence_map.get(issue.id)
|
||||||
|
if sequence_identifier:
|
||||||
|
sequence_identifier.sequence = updated_sequence_id
|
||||||
|
bulk_issue_sequences.append(sequence_identifier)
|
||||||
|
|
||||||
|
Issue.objects.bulk_update(bulk_issues, ["sequence_id"])
|
||||||
|
IssueSequence.objects.bulk_update(bulk_issue_sequences, ["sequence"])
|
||||||
|
|
||||||
|
self.stdout.write(self.style.SUCCESS("Sequence IDs updated successfully"))
|
||||||
|
except Exception as e:
|
||||||
|
raise CommandError(str(e))
|
||||||
|
|
@ -6,7 +6,7 @@ from django.conf import settings
|
||||||
from django.contrib.postgres.fields import ArrayField
|
from django.contrib.postgres.fields import ArrayField
|
||||||
from django.core.exceptions import ValidationError
|
from django.core.exceptions import ValidationError
|
||||||
from django.core.validators import MaxValueValidator, MinValueValidator
|
from django.core.validators import MaxValueValidator, MinValueValidator
|
||||||
from django.db import models, transaction
|
from django.db import models, transaction, connection
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
from django.db.models import Q
|
from django.db.models import Q
|
||||||
from django import apps
|
from django import apps
|
||||||
|
|
@ -15,8 +15,8 @@ from django import apps
|
||||||
from plane.utils.html_processor import strip_tags
|
from plane.utils.html_processor import strip_tags
|
||||||
from plane.db.mixins import SoftDeletionManager
|
from plane.db.mixins import SoftDeletionManager
|
||||||
from plane.utils.exception_logger import log_exception
|
from plane.utils.exception_logger import log_exception
|
||||||
from .base import BaseModel
|
|
||||||
from .project import ProjectBaseModel
|
from .project import ProjectBaseModel
|
||||||
|
from plane.utils.uuid import convert_uuid_to_integer
|
||||||
|
|
||||||
|
|
||||||
def get_default_properties():
|
def get_default_properties():
|
||||||
|
|
@ -209,11 +209,18 @@ class Issue(ProjectBaseModel):
|
||||||
|
|
||||||
if self._state.adding:
|
if self._state.adding:
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
last_sequence = (
|
# Create a lock for this specific project using an advisory lock
|
||||||
IssueSequence.objects.filter(project=self.project)
|
# This ensures only one transaction per project can execute this code at a time
|
||||||
.select_for_update()
|
lock_key = convert_uuid_to_integer(self.project.id)
|
||||||
.aggregate(largest=models.Max("sequence"))["largest"]
|
|
||||||
)
|
with connection.cursor() as cursor:
|
||||||
|
# Get an exclusive lock using the project ID as the lock key
|
||||||
|
cursor.execute("SELECT pg_advisory_xact_lock(%s)", [lock_key])
|
||||||
|
|
||||||
|
# Get the last sequence for the project
|
||||||
|
last_sequence = IssueSequence.objects.filter(
|
||||||
|
project=self.project
|
||||||
|
).aggregate(largest=models.Max("sequence"))["largest"]
|
||||||
self.sequence_id = last_sequence + 1 if last_sequence else 1
|
self.sequence_id = last_sequence + 1 if last_sequence else 1
|
||||||
# Strip the html tags using html parser
|
# Strip the html tags using html parser
|
||||||
self.description_stripped = (
|
self.description_stripped = (
|
||||||
|
|
|
||||||
22
apiserver/plane/utils/uuid.py
Normal file
22
apiserver/plane/utils/uuid.py
Normal file
|
|
@ -0,0 +1,22 @@
|
||||||
|
# Python imports
|
||||||
|
import uuid
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
|
||||||
|
def is_valid_uuid(uuid_str):
|
||||||
|
"""Check if a string is a valid UUID version 4"""
|
||||||
|
try:
|
||||||
|
uuid_obj = uuid.UUID(uuid_str)
|
||||||
|
return uuid_obj.version == 4
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def convert_uuid_to_integer(uuid_val: uuid.UUID) -> int:
|
||||||
|
"""Convert a UUID to a 64-bit signed integer"""
|
||||||
|
# Ensure UUID is a string
|
||||||
|
uuid_value: str = str(uuid_val)
|
||||||
|
# Hash to 64-bit signed int
|
||||||
|
h: bytes = hashlib.sha256(uuid_value.encode()).digest()
|
||||||
|
bigint: int = int.from_bytes(h[:8], byteorder="big", signed=True)
|
||||||
|
return bigint
|
||||||
|
|
@ -1,8 +0,0 @@
|
||||||
import uuid
|
|
||||||
|
|
||||||
def is_valid_uuid(uuid_str):
|
|
||||||
try:
|
|
||||||
uuid.UUID(uuid_str, version=4)
|
|
||||||
return True
|
|
||||||
except ValueError:
|
|
||||||
return False
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue