chore: added validation for description (#7507)
* added PageBinaryUpdateSerializer for binary data validation and update * chore: added validation for description * chore: removed the duplicated file * Fixed coderabbit comments - Improve content validation by consolidating patterns and enhancing recursion checks - Updated `PageBinaryUpdateSerializer` to simplify assignment of validated data. - Enhanced `content_validator.py` with consolidated dangerous patterns and added recursion depth checks to prevent stack overflow during validation. - Improved readability and maintainability of validation functions by using constants for patterns. --------- Co-authored-by: Dheeraj Kumar Ketireddy <dheeru0198@gmail.com>
This commit is contained in:
parent
b93883fc14
commit
69d5cd183f
11 changed files with 613 additions and 16 deletions
|
|
@ -21,6 +21,11 @@ from plane.db.models import (
|
||||||
State,
|
State,
|
||||||
User,
|
User,
|
||||||
)
|
)
|
||||||
|
from plane.utils.content_validator import (
|
||||||
|
validate_html_content,
|
||||||
|
validate_json_content,
|
||||||
|
validate_binary_data,
|
||||||
|
)
|
||||||
|
|
||||||
from .base import BaseSerializer
|
from .base import BaseSerializer
|
||||||
from .cycle import CycleLiteSerializer, CycleSerializer
|
from .cycle import CycleLiteSerializer, CycleSerializer
|
||||||
|
|
@ -75,6 +80,22 @@ class IssueSerializer(BaseSerializer):
|
||||||
except Exception:
|
except Exception:
|
||||||
raise serializers.ValidationError("Invalid HTML passed")
|
raise serializers.ValidationError("Invalid HTML passed")
|
||||||
|
|
||||||
|
# Validate description content for security
|
||||||
|
if data.get("description"):
|
||||||
|
is_valid, error_msg = validate_json_content(data["description"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description": error_msg})
|
||||||
|
|
||||||
|
if data.get("description_html"):
|
||||||
|
is_valid, error_msg = validate_html_content(data["description_html"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description_html": error_msg})
|
||||||
|
|
||||||
|
if data.get("description_binary"):
|
||||||
|
is_valid, error_msg = validate_binary_data(data["description_binary"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description_binary": error_msg})
|
||||||
|
|
||||||
# Validate assignees are from project
|
# Validate assignees are from project
|
||||||
if data.get("assignees", []):
|
if data.get("assignees", []):
|
||||||
data["assignees"] = ProjectMember.objects.filter(
|
data["assignees"] = ProjectMember.objects.filter(
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,11 @@ from rest_framework import serializers
|
||||||
|
|
||||||
# Module imports
|
# Module imports
|
||||||
from plane.db.models import Project, ProjectIdentifier, WorkspaceMember
|
from plane.db.models import Project, ProjectIdentifier, WorkspaceMember
|
||||||
|
from plane.utils.content_validator import (
|
||||||
|
validate_html_content,
|
||||||
|
validate_json_content,
|
||||||
|
validate_binary_data,
|
||||||
|
)
|
||||||
|
|
||||||
from .base import BaseSerializer
|
from .base import BaseSerializer
|
||||||
|
|
||||||
|
|
@ -57,6 +62,29 @@ class ProjectSerializer(BaseSerializer):
|
||||||
"Default assignee should be a user in the workspace"
|
"Default assignee should be a user in the workspace"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Validate description content for security
|
||||||
|
if "description" in data and data["description"]:
|
||||||
|
# For Project, description might be text field, not JSON
|
||||||
|
if isinstance(data["description"], dict):
|
||||||
|
is_valid, error_msg = validate_json_content(data["description"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description": error_msg})
|
||||||
|
|
||||||
|
if "description_text" in data and data["description_text"]:
|
||||||
|
is_valid, error_msg = validate_json_content(data["description_text"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description_text": error_msg})
|
||||||
|
|
||||||
|
if "description_html" in data and data["description_html"]:
|
||||||
|
if isinstance(data["description_html"], dict):
|
||||||
|
is_valid, error_msg = validate_json_content(data["description_html"])
|
||||||
|
else:
|
||||||
|
is_valid, error_msg = validate_html_content(
|
||||||
|
str(data["description_html"])
|
||||||
|
)
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description_html": error_msg})
|
||||||
|
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def create(self, validated_data):
|
def create(self, validated_data):
|
||||||
|
|
|
||||||
|
|
@ -96,6 +96,7 @@ from .page import (
|
||||||
SubPageSerializer,
|
SubPageSerializer,
|
||||||
PageDetailSerializer,
|
PageDetailSerializer,
|
||||||
PageVersionSerializer,
|
PageVersionSerializer,
|
||||||
|
PageBinaryUpdateSerializer,
|
||||||
PageVersionDetailSerializer,
|
PageVersionDetailSerializer,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,11 @@ from plane.db.models import (
|
||||||
DraftIssueCycle,
|
DraftIssueCycle,
|
||||||
DraftIssueModule,
|
DraftIssueModule,
|
||||||
)
|
)
|
||||||
|
from plane.utils.content_validator import (
|
||||||
|
validate_html_content,
|
||||||
|
validate_json_content,
|
||||||
|
validate_binary_data,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class DraftIssueCreateSerializer(BaseSerializer):
|
class DraftIssueCreateSerializer(BaseSerializer):
|
||||||
|
|
@ -64,6 +69,23 @@ class DraftIssueCreateSerializer(BaseSerializer):
|
||||||
and data.get("start_date", None) > data.get("target_date", None)
|
and data.get("start_date", None) > data.get("target_date", None)
|
||||||
):
|
):
|
||||||
raise serializers.ValidationError("Start date cannot exceed target date")
|
raise serializers.ValidationError("Start date cannot exceed target date")
|
||||||
|
|
||||||
|
# Validate description content for security
|
||||||
|
if "description" in data and data["description"]:
|
||||||
|
is_valid, error_msg = validate_json_content(data["description"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description": error_msg})
|
||||||
|
|
||||||
|
if "description_html" in data and data["description_html"]:
|
||||||
|
is_valid, error_msg = validate_html_content(data["description_html"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description_html": error_msg})
|
||||||
|
|
||||||
|
if "description_binary" in data and data["description_binary"]:
|
||||||
|
is_valid, error_msg = validate_binary_data(data["description_binary"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description_binary": error_msg})
|
||||||
|
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def create(self, validated_data):
|
def create(self, validated_data):
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,11 @@ from plane.db.models import (
|
||||||
IssueDescriptionVersion,
|
IssueDescriptionVersion,
|
||||||
ProjectMember,
|
ProjectMember,
|
||||||
)
|
)
|
||||||
|
from plane.utils.content_validator import (
|
||||||
|
validate_html_content,
|
||||||
|
validate_json_content,
|
||||||
|
validate_binary_data,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class IssueFlatSerializer(BaseSerializer):
|
class IssueFlatSerializer(BaseSerializer):
|
||||||
|
|
@ -127,6 +132,22 @@ class IssueCreateSerializer(BaseSerializer):
|
||||||
member_id__in=attrs["assignee_ids"],
|
member_id__in=attrs["assignee_ids"],
|
||||||
).values_list("member_id", flat=True)
|
).values_list("member_id", flat=True)
|
||||||
|
|
||||||
|
# Validate description content for security
|
||||||
|
if "description" in attrs and attrs["description"]:
|
||||||
|
is_valid, error_msg = validate_json_content(attrs["description"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description": error_msg})
|
||||||
|
|
||||||
|
if "description_html" in attrs and attrs["description_html"]:
|
||||||
|
is_valid, error_msg = validate_html_content(attrs["description_html"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description_html": error_msg})
|
||||||
|
|
||||||
|
if "description_binary" in attrs and attrs["description_binary"]:
|
||||||
|
is_valid, error_msg = validate_binary_data(attrs["description_binary"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description_binary": error_msg})
|
||||||
|
|
||||||
return attrs
|
return attrs
|
||||||
|
|
||||||
def create(self, validated_data):
|
def create(self, validated_data):
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,14 @@
|
||||||
# Third party imports
|
# Third party imports
|
||||||
from rest_framework import serializers
|
from rest_framework import serializers
|
||||||
|
import base64
|
||||||
|
|
||||||
# Module imports
|
# Module imports
|
||||||
from .base import BaseSerializer
|
from .base import BaseSerializer
|
||||||
|
from plane.utils.content_validator import (
|
||||||
|
validate_binary_data,
|
||||||
|
validate_html_content,
|
||||||
|
validate_json_content,
|
||||||
|
)
|
||||||
from plane.db.models import (
|
from plane.db.models import (
|
||||||
Page,
|
Page,
|
||||||
PageLog,
|
PageLog,
|
||||||
|
|
@ -186,3 +192,71 @@ class PageVersionDetailSerializer(BaseSerializer):
|
||||||
"updated_by",
|
"updated_by",
|
||||||
]
|
]
|
||||||
read_only_fields = ["workspace", "page"]
|
read_only_fields = ["workspace", "page"]
|
||||||
|
|
||||||
|
|
||||||
|
class PageBinaryUpdateSerializer(serializers.Serializer):
|
||||||
|
"""Serializer for updating page binary description with validation"""
|
||||||
|
|
||||||
|
description_binary = serializers.CharField(required=False, allow_blank=True)
|
||||||
|
description_html = serializers.CharField(required=False, allow_blank=True)
|
||||||
|
description = serializers.JSONField(required=False, allow_null=True)
|
||||||
|
|
||||||
|
def validate_description_binary(self, value):
|
||||||
|
"""Validate the base64-encoded binary data"""
|
||||||
|
if not value:
|
||||||
|
return value
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Decode the base64 data
|
||||||
|
binary_data = base64.b64decode(value)
|
||||||
|
|
||||||
|
# Validate the binary data
|
||||||
|
is_valid, error_message = validate_binary_data(binary_data)
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError(
|
||||||
|
f"Invalid binary data: {error_message}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return binary_data
|
||||||
|
except Exception as e:
|
||||||
|
if isinstance(e, serializers.ValidationError):
|
||||||
|
raise
|
||||||
|
raise serializers.ValidationError("Failed to decode base64 data")
|
||||||
|
|
||||||
|
def validate_description_html(self, value):
|
||||||
|
"""Validate the HTML content"""
|
||||||
|
if not value:
|
||||||
|
return value
|
||||||
|
|
||||||
|
# Use the validation function from utils
|
||||||
|
is_valid, error_message = validate_html_content(value)
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError(error_message)
|
||||||
|
|
||||||
|
return value
|
||||||
|
|
||||||
|
def validate_description(self, value):
|
||||||
|
"""Validate the JSON description"""
|
||||||
|
if not value:
|
||||||
|
return value
|
||||||
|
|
||||||
|
# Use the validation function from utils
|
||||||
|
is_valid, error_message = validate_json_content(value)
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError(error_message)
|
||||||
|
|
||||||
|
return value
|
||||||
|
|
||||||
|
def update(self, instance, validated_data):
|
||||||
|
"""Update the page instance with validated data"""
|
||||||
|
if "description_binary" in validated_data:
|
||||||
|
instance.description_binary = validated_data.get("description_binary")
|
||||||
|
|
||||||
|
if "description_html" in validated_data:
|
||||||
|
instance.description_html = validated_data.get("description_html")
|
||||||
|
|
||||||
|
if "description" in validated_data:
|
||||||
|
instance.description = validated_data.get("description")
|
||||||
|
|
||||||
|
instance.save()
|
||||||
|
return instance
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,11 @@ from plane.db.models import (
|
||||||
DeployBoard,
|
DeployBoard,
|
||||||
ProjectPublicMember,
|
ProjectPublicMember,
|
||||||
)
|
)
|
||||||
|
from plane.utils.content_validator import (
|
||||||
|
validate_html_content,
|
||||||
|
validate_json_content,
|
||||||
|
validate_binary_data,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class ProjectSerializer(BaseSerializer):
|
class ProjectSerializer(BaseSerializer):
|
||||||
|
|
@ -58,6 +63,32 @@ class ProjectSerializer(BaseSerializer):
|
||||||
|
|
||||||
return identifier
|
return identifier
|
||||||
|
|
||||||
|
def validate(self, data):
|
||||||
|
# Validate description content for security
|
||||||
|
if "description" in data and data["description"]:
|
||||||
|
# For Project, description might be text field, not JSON
|
||||||
|
if isinstance(data["description"], dict):
|
||||||
|
is_valid, error_msg = validate_json_content(data["description"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description": error_msg})
|
||||||
|
|
||||||
|
if "description_text" in data and data["description_text"]:
|
||||||
|
is_valid, error_msg = validate_json_content(data["description_text"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description_text": error_msg})
|
||||||
|
|
||||||
|
if "description_html" in data and data["description_html"]:
|
||||||
|
if isinstance(data["description_html"], dict):
|
||||||
|
is_valid, error_msg = validate_json_content(data["description_html"])
|
||||||
|
else:
|
||||||
|
is_valid, error_msg = validate_html_content(
|
||||||
|
str(data["description_html"])
|
||||||
|
)
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description_html": error_msg})
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
def create(self, validated_data):
|
def create(self, validated_data):
|
||||||
workspace_id = self.context["workspace_id"]
|
workspace_id = self.context["workspace_id"]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,11 @@ from plane.db.models import (
|
||||||
)
|
)
|
||||||
from plane.utils.constants import RESTRICTED_WORKSPACE_SLUGS
|
from plane.utils.constants import RESTRICTED_WORKSPACE_SLUGS
|
||||||
from plane.utils.url import contains_url
|
from plane.utils.url import contains_url
|
||||||
|
from plane.utils.content_validator import (
|
||||||
|
validate_html_content,
|
||||||
|
validate_json_content,
|
||||||
|
validate_binary_data,
|
||||||
|
)
|
||||||
|
|
||||||
# Django imports
|
# Django imports
|
||||||
from django.core.validators import URLValidator
|
from django.core.validators import URLValidator
|
||||||
|
|
@ -312,6 +317,25 @@ class StickySerializer(BaseSerializer):
|
||||||
read_only_fields = ["workspace", "owner"]
|
read_only_fields = ["workspace", "owner"]
|
||||||
extra_kwargs = {"name": {"required": False}}
|
extra_kwargs = {"name": {"required": False}}
|
||||||
|
|
||||||
|
def validate(self, data):
|
||||||
|
# Validate description content for security
|
||||||
|
if "description" in data and data["description"]:
|
||||||
|
is_valid, error_msg = validate_json_content(data["description"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description": error_msg})
|
||||||
|
|
||||||
|
if "description_html" in data and data["description_html"]:
|
||||||
|
is_valid, error_msg = validate_html_content(data["description_html"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description_html": error_msg})
|
||||||
|
|
||||||
|
if "description_binary" in data and data["description_binary"]:
|
||||||
|
is_valid, error_msg = validate_binary_data(data["description_binary"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description_binary": error_msg})
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
class WorkspaceUserPreferenceSerializer(BaseSerializer):
|
class WorkspaceUserPreferenceSerializer(BaseSerializer):
|
||||||
class Meta:
|
class Meta:
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@ from plane.app.serializers import (
|
||||||
PageSerializer,
|
PageSerializer,
|
||||||
SubPageSerializer,
|
SubPageSerializer,
|
||||||
PageDetailSerializer,
|
PageDetailSerializer,
|
||||||
|
PageBinaryUpdateSerializer,
|
||||||
)
|
)
|
||||||
from plane.db.models import (
|
from plane.db.models import (
|
||||||
Page,
|
Page,
|
||||||
|
|
@ -538,32 +539,27 @@ class PagesDescriptionViewSet(BaseViewSet):
|
||||||
{"description_html": page.description_html}, cls=DjangoJSONEncoder
|
{"description_html": page.description_html}, cls=DjangoJSONEncoder
|
||||||
)
|
)
|
||||||
|
|
||||||
# Get the base64 data from the request
|
# Use serializer for validation and update
|
||||||
base64_data = request.data.get("description_binary")
|
serializer = PageBinaryUpdateSerializer(page, data=request.data, partial=True)
|
||||||
|
if serializer.is_valid():
|
||||||
# If base64 data is provided
|
# Capture the page transaction
|
||||||
if base64_data:
|
|
||||||
# Decode the base64 data to bytes
|
|
||||||
new_binary_data = base64.b64decode(base64_data)
|
|
||||||
# capture the page transaction
|
|
||||||
if request.data.get("description_html"):
|
if request.data.get("description_html"):
|
||||||
page_transaction.delay(
|
page_transaction.delay(
|
||||||
new_value=request.data, old_value=existing_instance, page_id=pk
|
new_value=request.data, old_value=existing_instance, page_id=pk
|
||||||
)
|
)
|
||||||
# Store the updated binary data
|
|
||||||
page.description_binary = new_binary_data
|
# Update the page using serializer
|
||||||
page.description_html = request.data.get("description_html")
|
updated_page = serializer.save()
|
||||||
page.description = request.data.get("description")
|
|
||||||
page.save()
|
# Run background tasks
|
||||||
# Return a success response
|
|
||||||
page_version.delay(
|
page_version.delay(
|
||||||
page_id=page.id,
|
page_id=updated_page.id,
|
||||||
existing_instance=existing_instance,
|
existing_instance=existing_instance,
|
||||||
user_id=request.user.id,
|
user_id=request.user.id,
|
||||||
)
|
)
|
||||||
return Response({"message": "Updated successfully"})
|
return Response({"message": "Updated successfully"})
|
||||||
else:
|
else:
|
||||||
return Response({"error": "No binary data provided"})
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
||||||
|
|
||||||
|
|
||||||
class PageDuplicateEndpoint(BaseAPIView):
|
class PageDuplicateEndpoint(BaseAPIView):
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,11 @@ from plane.db.models import (
|
||||||
IssueVote,
|
IssueVote,
|
||||||
IssueRelation,
|
IssueRelation,
|
||||||
)
|
)
|
||||||
|
from plane.utils.content_validator import (
|
||||||
|
validate_html_content,
|
||||||
|
validate_json_content,
|
||||||
|
validate_binary_data,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class IssueStateFlatSerializer(BaseSerializer):
|
class IssueStateFlatSerializer(BaseSerializer):
|
||||||
|
|
@ -283,6 +288,23 @@ class IssueCreateSerializer(BaseSerializer):
|
||||||
and data.get("start_date", None) > data.get("target_date", None)
|
and data.get("start_date", None) > data.get("target_date", None)
|
||||||
):
|
):
|
||||||
raise serializers.ValidationError("Start date cannot exceed target date")
|
raise serializers.ValidationError("Start date cannot exceed target date")
|
||||||
|
|
||||||
|
# Validate description content for security
|
||||||
|
if "description" in data and data["description"]:
|
||||||
|
is_valid, error_msg = validate_json_content(data["description"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description": error_msg})
|
||||||
|
|
||||||
|
if "description_html" in data and data["description_html"]:
|
||||||
|
is_valid, error_msg = validate_html_content(data["description_html"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description_html": error_msg})
|
||||||
|
|
||||||
|
if "description_binary" in data and data["description_binary"]:
|
||||||
|
is_valid, error_msg = validate_binary_data(data["description_binary"])
|
||||||
|
if not is_valid:
|
||||||
|
raise serializers.ValidationError({"description_binary": error_msg})
|
||||||
|
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def create(self, validated_data):
|
def create(self, validated_data):
|
||||||
|
|
|
||||||
357
apps/api/plane/utils/content_validator.py
Normal file
357
apps/api/plane/utils/content_validator.py
Normal file
|
|
@ -0,0 +1,357 @@
|
||||||
|
# Python imports
|
||||||
|
import base64
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
|
||||||
|
|
||||||
|
# Maximum allowed size for binary data (10MB)
|
||||||
|
MAX_SIZE = 10 * 1024 * 1024
|
||||||
|
|
||||||
|
# Maximum recursion depth to prevent stack overflow
|
||||||
|
MAX_RECURSION_DEPTH = 20
|
||||||
|
|
||||||
|
# Dangerous text patterns that could indicate XSS or script injection
|
||||||
|
DANGEROUS_TEXT_PATTERNS = [
|
||||||
|
r"<script[^>]*>.*?</script>",
|
||||||
|
r"javascript\s*:",
|
||||||
|
r"data\s*:\s*text/html",
|
||||||
|
r"eval\s*\(",
|
||||||
|
r"document\s*\.",
|
||||||
|
r"window\s*\.",
|
||||||
|
r"location\s*\.",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Dangerous attribute patterns for HTML attributes
|
||||||
|
DANGEROUS_ATTR_PATTERNS = [
|
||||||
|
r"javascript\s*:",
|
||||||
|
r"data\s*:\s*text/html",
|
||||||
|
r"eval\s*\(",
|
||||||
|
r"alert\s*\(",
|
||||||
|
r"document\s*\.",
|
||||||
|
r"window\s*\.",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Suspicious patterns for binary data content
|
||||||
|
SUSPICIOUS_BINARY_PATTERNS = [
|
||||||
|
"<html",
|
||||||
|
"<!doctype",
|
||||||
|
"<script",
|
||||||
|
"javascript:",
|
||||||
|
"data:",
|
||||||
|
"<iframe",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Malicious HTML patterns for content validation
|
||||||
|
MALICIOUS_HTML_PATTERNS = [
|
||||||
|
# Script tags with any content
|
||||||
|
r"<script[^>]*>",
|
||||||
|
r"</script>",
|
||||||
|
# JavaScript URLs in various attributes
|
||||||
|
r'(?:href|src|action)\s*=\s*["\']?\s*javascript:',
|
||||||
|
# Data URLs with text/html (potential XSS)
|
||||||
|
r'(?:href|src|action)\s*=\s*["\']?\s*data:text/html',
|
||||||
|
# Dangerous event handlers with JavaScript-like content
|
||||||
|
r'on(?:load|error|click|focus|blur|change|submit|reset|select|resize|scroll|unload|beforeunload|hashchange|popstate|storage|message|offline|online)\s*=\s*["\']?[^"\']*(?:javascript|alert|eval|document\.|window\.|location\.|history\.)[^"\']*["\']?',
|
||||||
|
# Object and embed tags that could load external content
|
||||||
|
r"<(?:object|embed)[^>]*(?:data|src)\s*=",
|
||||||
|
# Base tag that could change relative URL resolution
|
||||||
|
r"<base[^>]*href\s*=",
|
||||||
|
# Dangerous iframe sources
|
||||||
|
r'<iframe[^>]*src\s*=\s*["\']?(?:javascript:|data:text/html)',
|
||||||
|
# Meta refresh redirects
|
||||||
|
r'<meta[^>]*http-equiv\s*=\s*["\']?refresh["\']?',
|
||||||
|
# Link tags - simplified patterns
|
||||||
|
r'<link[^>]*rel\s*=\s*["\']?stylesheet["\']?',
|
||||||
|
r'<link[^>]*href\s*=\s*["\']?https?://',
|
||||||
|
r'<link[^>]*href\s*=\s*["\']?//',
|
||||||
|
r'<link[^>]*href\s*=\s*["\']?(?:data:|javascript:)',
|
||||||
|
# Style tags with external imports
|
||||||
|
r"<style[^>]*>.*?@import.*?(?:https?://|//)",
|
||||||
|
# Link tags with dangerous rel types
|
||||||
|
r'<link[^>]*rel\s*=\s*["\']?(?:import|preload|prefetch|dns-prefetch|preconnect)["\']?',
|
||||||
|
# Forms with action attributes
|
||||||
|
r"<form[^>]*action\s*=",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Dangerous JavaScript patterns for event handlers
|
||||||
|
DANGEROUS_JS_PATTERNS = [
|
||||||
|
r"alert\s*\(",
|
||||||
|
r"eval\s*\(",
|
||||||
|
r"document\s*\.",
|
||||||
|
r"window\s*\.",
|
||||||
|
r"location\s*\.",
|
||||||
|
r"fetch\s*\(",
|
||||||
|
r"XMLHttpRequest",
|
||||||
|
r"innerHTML\s*=",
|
||||||
|
r"outerHTML\s*=",
|
||||||
|
r"document\.write",
|
||||||
|
r"script\s*>",
|
||||||
|
]
|
||||||
|
|
||||||
|
# HTML self-closing tags that don't need closing tags
|
||||||
|
SELF_CLOSING_TAGS = {
|
||||||
|
"img",
|
||||||
|
"br",
|
||||||
|
"hr",
|
||||||
|
"input",
|
||||||
|
"meta",
|
||||||
|
"link",
|
||||||
|
"area",
|
||||||
|
"base",
|
||||||
|
"col",
|
||||||
|
"embed",
|
||||||
|
"source",
|
||||||
|
"track",
|
||||||
|
"wbr",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def validate_binary_data(data):
|
||||||
|
"""
|
||||||
|
Validate that binary data appears to be valid document format and doesn't contain malicious content.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (bytes or str): The binary data to validate, or base64-encoded string
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple: (is_valid: bool, error_message: str or None)
|
||||||
|
"""
|
||||||
|
if not data:
|
||||||
|
return True, None # Empty is OK
|
||||||
|
|
||||||
|
# Handle base64-encoded strings by decoding them first
|
||||||
|
if isinstance(data, str):
|
||||||
|
try:
|
||||||
|
binary_data = base64.b64decode(data)
|
||||||
|
except Exception:
|
||||||
|
return False, "Invalid base64 encoding"
|
||||||
|
else:
|
||||||
|
binary_data = data
|
||||||
|
|
||||||
|
# Size check - 10MB limit
|
||||||
|
if len(binary_data) > MAX_SIZE:
|
||||||
|
return False, "Binary data exceeds maximum size limit (10MB)"
|
||||||
|
|
||||||
|
# Basic format validation
|
||||||
|
if len(binary_data) < 4:
|
||||||
|
return False, "Binary data too short to be valid document format"
|
||||||
|
|
||||||
|
# Check for suspicious text patterns (HTML/JS)
|
||||||
|
try:
|
||||||
|
decoded_text = binary_data.decode("utf-8", errors="ignore")[:200]
|
||||||
|
if any(
|
||||||
|
pattern in decoded_text.lower() for pattern in SUSPICIOUS_BINARY_PATTERNS
|
||||||
|
):
|
||||||
|
return False, "Binary data contains suspicious content patterns"
|
||||||
|
except Exception:
|
||||||
|
pass # Binary data might not be decodable as text, which is fine
|
||||||
|
|
||||||
|
return True, None
|
||||||
|
|
||||||
|
|
||||||
|
def validate_html_content(html_content):
|
||||||
|
"""
|
||||||
|
Validate that HTML content is safe and doesn't contain malicious patterns.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
html_content (str): The HTML content to validate
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple: (is_valid: bool, error_message: str or None)
|
||||||
|
"""
|
||||||
|
if not html_content:
|
||||||
|
return True, None # Empty is OK
|
||||||
|
|
||||||
|
# Size check - 10MB limit (consistent with binary validation)
|
||||||
|
if len(html_content.encode("utf-8")) > MAX_SIZE:
|
||||||
|
return False, "HTML content exceeds maximum size limit (10MB)"
|
||||||
|
|
||||||
|
# Check for specific malicious patterns (simplified and more reliable)
|
||||||
|
for pattern in MALICIOUS_HTML_PATTERNS:
|
||||||
|
if re.search(pattern, html_content, re.IGNORECASE | re.DOTALL):
|
||||||
|
return (
|
||||||
|
False,
|
||||||
|
f"HTML content contains potentially malicious patterns: {pattern}",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Additional check for inline event handlers that contain suspicious content
|
||||||
|
# This is more permissive - only blocks if the event handler contains actual dangerous code
|
||||||
|
event_handler_pattern = r'on\w+\s*=\s*["\']([^"\']*)["\']'
|
||||||
|
event_matches = re.findall(event_handler_pattern, html_content, re.IGNORECASE)
|
||||||
|
|
||||||
|
for handler_content in event_matches:
|
||||||
|
for js_pattern in DANGEROUS_JS_PATTERNS:
|
||||||
|
if re.search(js_pattern, handler_content, re.IGNORECASE):
|
||||||
|
return (
|
||||||
|
False,
|
||||||
|
f"HTML content contains dangerous JavaScript in event handler: {handler_content[:100]}",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Basic HTML structure validation - check for common malformed tags
|
||||||
|
try:
|
||||||
|
# Count opening and closing tags for basic structure validation
|
||||||
|
opening_tags = re.findall(r"<(\w+)[^>]*>", html_content)
|
||||||
|
closing_tags = re.findall(r"</(\w+)>", html_content)
|
||||||
|
|
||||||
|
# Filter out self-closing tags from opening tags
|
||||||
|
opening_tags_filtered = [
|
||||||
|
tag for tag in opening_tags if tag.lower() not in SELF_CLOSING_TAGS
|
||||||
|
]
|
||||||
|
|
||||||
|
# Basic check - if we have significantly more opening than closing tags, it might be malformed
|
||||||
|
if len(opening_tags_filtered) > len(closing_tags) + 10: # Allow some tolerance
|
||||||
|
return False, "HTML content appears to be malformed (unmatched tags)"
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
# If HTML parsing fails, we'll allow it
|
||||||
|
pass
|
||||||
|
|
||||||
|
return True, None
|
||||||
|
|
||||||
|
|
||||||
|
def validate_json_content(json_content):
|
||||||
|
"""
|
||||||
|
Validate that JSON content is safe and doesn't contain malicious patterns.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
json_content (dict): The JSON content to validate
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple: (is_valid: bool, error_message: str or None)
|
||||||
|
"""
|
||||||
|
if not json_content:
|
||||||
|
return True, None # Empty is OK
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Size check - 10MB limit (consistent with other validations)
|
||||||
|
json_str = json.dumps(json_content)
|
||||||
|
if len(json_str.encode("utf-8")) > MAX_SIZE:
|
||||||
|
return False, "JSON content exceeds maximum size limit (10MB)"
|
||||||
|
|
||||||
|
# Basic structure validation for page description JSON
|
||||||
|
if isinstance(json_content, dict):
|
||||||
|
# Check for expected page description structure
|
||||||
|
# This is based on ProseMirror/Tiptap JSON structure
|
||||||
|
if "type" in json_content and json_content.get("type") == "doc":
|
||||||
|
# Valid document structure
|
||||||
|
if "content" in json_content and isinstance(
|
||||||
|
json_content["content"], list
|
||||||
|
):
|
||||||
|
# Recursively check content for suspicious patterns
|
||||||
|
is_valid, error_msg = _validate_json_content_array(
|
||||||
|
json_content["content"]
|
||||||
|
)
|
||||||
|
if not is_valid:
|
||||||
|
return False, error_msg
|
||||||
|
elif "type" not in json_content and "content" not in json_content:
|
||||||
|
# Allow other JSON structures but validate for suspicious content
|
||||||
|
is_valid, error_msg = _validate_json_content_recursive(json_content)
|
||||||
|
if not is_valid:
|
||||||
|
return False, error_msg
|
||||||
|
else:
|
||||||
|
return False, "JSON description must be a valid object"
|
||||||
|
|
||||||
|
except (TypeError, ValueError) as e:
|
||||||
|
return False, "Invalid JSON structure"
|
||||||
|
except Exception as e:
|
||||||
|
return False, "Failed to validate JSON content"
|
||||||
|
|
||||||
|
return True, None
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_json_content_array(content, depth=0):
|
||||||
|
"""
|
||||||
|
Validate JSON content array for suspicious patterns.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
content (list): Array of content nodes to validate
|
||||||
|
depth (int): Current recursion depth (default: 0)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple: (is_valid: bool, error_message: str or None)
|
||||||
|
"""
|
||||||
|
# Check recursion depth to prevent stack overflow
|
||||||
|
if depth > MAX_RECURSION_DEPTH:
|
||||||
|
return False, f"Maximum recursion depth ({MAX_RECURSION_DEPTH}) exceeded"
|
||||||
|
|
||||||
|
if not isinstance(content, list):
|
||||||
|
return True, None
|
||||||
|
|
||||||
|
for node in content:
|
||||||
|
if isinstance(node, dict):
|
||||||
|
# Check text content for suspicious patterns (more targeted)
|
||||||
|
if node.get("type") == "text" and "text" in node:
|
||||||
|
text_content = node["text"]
|
||||||
|
for pattern in DANGEROUS_TEXT_PATTERNS:
|
||||||
|
if re.search(pattern, text_content, re.IGNORECASE):
|
||||||
|
return (
|
||||||
|
False,
|
||||||
|
"JSON content contains suspicious script patterns in text",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check attributes for suspicious content (more targeted)
|
||||||
|
if "attrs" in node and isinstance(node["attrs"], dict):
|
||||||
|
for attr_name, attr_value in node["attrs"].items():
|
||||||
|
if isinstance(attr_value, str):
|
||||||
|
# Only check specific attributes that could be dangerous
|
||||||
|
if attr_name.lower() in [
|
||||||
|
"href",
|
||||||
|
"src",
|
||||||
|
"action",
|
||||||
|
"onclick",
|
||||||
|
"onload",
|
||||||
|
"onerror",
|
||||||
|
]:
|
||||||
|
for pattern in DANGEROUS_ATTR_PATTERNS:
|
||||||
|
if re.search(pattern, attr_value, re.IGNORECASE):
|
||||||
|
return (
|
||||||
|
False,
|
||||||
|
f"JSON content contains dangerous pattern in {attr_name} attribute",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Recursively check nested content
|
||||||
|
if "content" in node and isinstance(node["content"], list):
|
||||||
|
is_valid, error_msg = _validate_json_content_array(
|
||||||
|
node["content"], depth + 1
|
||||||
|
)
|
||||||
|
if not is_valid:
|
||||||
|
return False, error_msg
|
||||||
|
|
||||||
|
return True, None
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_json_content_recursive(obj, depth=0):
|
||||||
|
"""
|
||||||
|
Recursively validate JSON object for suspicious content.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
obj: JSON object (dict, list, or primitive) to validate
|
||||||
|
depth (int): Current recursion depth (default: 0)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple: (is_valid: bool, error_message: str or None)
|
||||||
|
"""
|
||||||
|
# Check recursion depth to prevent stack overflow
|
||||||
|
if depth > MAX_RECURSION_DEPTH:
|
||||||
|
return False, f"Maximum recursion depth ({MAX_RECURSION_DEPTH}) exceeded"
|
||||||
|
if isinstance(obj, dict):
|
||||||
|
for key, value in obj.items():
|
||||||
|
if isinstance(value, str):
|
||||||
|
# Check for dangerous patterns using module constants
|
||||||
|
for pattern in DANGEROUS_TEXT_PATTERNS:
|
||||||
|
if re.search(pattern, value, re.IGNORECASE):
|
||||||
|
return (
|
||||||
|
False,
|
||||||
|
"JSON content contains suspicious script patterns",
|
||||||
|
)
|
||||||
|
elif isinstance(value, (dict, list)):
|
||||||
|
is_valid, error_msg = _validate_json_content_recursive(value, depth + 1)
|
||||||
|
if not is_valid:
|
||||||
|
return False, error_msg
|
||||||
|
elif isinstance(obj, list):
|
||||||
|
for item in obj:
|
||||||
|
is_valid, error_msg = _validate_json_content_recursive(item, depth + 1)
|
||||||
|
if not is_valid:
|
||||||
|
return False, error_msg
|
||||||
|
|
||||||
|
return True, None
|
||||||
Loading…
Add table
Add a link
Reference in a new issue