[SECUR-113] fix: ssrf for work item links (#8607)

This commit is contained in:
Sangeetha 2026-02-05 15:03:43 +05:30 committed by sriramveeraghanta
parent 95d121ce38
commit b783f25bfa

View file

@ -1,6 +1,10 @@
# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.
# Python imports
import logging
import socket
# Third party imports
from celery import shared_task
@ -20,6 +24,48 @@ logger = logging.getLogger("plane.worker")
DEFAULT_FAVICON = "PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoPSIyNCIgaGVpZ2h0PSIyNCIgdmlld0JveD0iMCAwIDI0IDI0IiBmaWxsPSJub25lIiBzdHJva2U9ImN1cnJlbnRDb2xvciIgc3Ryb2tlLXdpZHRoPSIyIiBzdHJva2UtbGluZWNhcD0icm91bmQiIHN0cm9rZS1saW5lam9pbj0icm91bmQiIGNsYXNzPSJsdWNpZGUgbHVjaWRlLWxpbmstaWNvbiBsdWNpZGUtbGluayI+PHBhdGggZD0iTTEwIDEzYTUgNSAwIDAgMCA3LjU0LjU0bDMtM2E1IDUgMCAwIDAtNy4wNy03LjA3bC0xLjcyIDEuNzEiLz48cGF0aCBkPSJNMTQgMTFhNSA1IDAgMCAwLTcuNTQtLjU0bC0zIDNhNSA1IDAgMCAwIDcuMDcgNy4wN2wxLjcxLTEuNzEiLz48L3N2Zz4=" # noqa: E501
def validate_url_ip(url: str) -> None:
"""
Validate that a URL doesn't point to a private/internal IP address.
Resolves hostnames to IPs before checking.
Args:
url: The URL to validate
Raises:
ValueError: If the URL points to a private/internal IP
"""
parsed = urlparse(url)
hostname = parsed.hostname
if not hostname:
raise ValueError("Invalid URL: No hostname found")
# Only allow HTTP and HTTPS to prevent file://, gopher://, etc.
if parsed.scheme not in ("http", "https"):
raise ValueError("Invalid URL scheme. Only HTTP and HTTPS are allowed")
# Resolve hostname to IP addresses — this catches domain names that
# point to internal IPs (e.g. attacker.com -> 169.254.169.254)
try:
addr_info = socket.getaddrinfo(hostname, None)
except socket.gaierror:
raise ValueError("Hostname could not be resolved")
if not addr_info:
raise ValueError("No IP addresses found for the hostname")
# Check every resolved IP against blocked ranges to prevent SSRF
for addr in addr_info:
ip = ipaddress.ip_address(addr[4][0])
if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local:
raise ValueError("Access to private/internal networks is not allowed")
MAX_REDIRECTS = 5
def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]:
"""
Crawls a URL to extract the title and favicon.
@ -31,17 +77,6 @@ def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]:
str: JSON string containing title and base64-encoded favicon
"""
try:
# Prevent access to private IP ranges
parsed = urlparse(url)
try:
ip = ipaddress.ip_address(parsed.hostname)
if ip.is_private or ip.is_loopback or ip.is_reserved:
raise ValueError("Access to private/internal networks is not allowed")
except ValueError:
# Not an IP address, continue with domain validation
pass
# Set up headers to mimic a real browser
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" # noqa: E501
@ -49,9 +84,28 @@ def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]:
soup = None
title = None
final_url = url
validate_url_ip(final_url)
try:
response = requests.get(url, headers=headers, timeout=1)
# Manually follow redirects to validate each URL before requesting
redirect_count = 0
response = requests.get(final_url, headers=headers, timeout=1, allow_redirects=False)
while response.is_redirect and redirect_count < MAX_REDIRECTS:
redirect_url = response.headers.get("Location")
if not redirect_url:
break
# Resolve relative redirects against current URL
final_url = urljoin(final_url, redirect_url)
# Validate the redirect target BEFORE making the request
validate_url_ip(final_url)
redirect_count += 1
response = requests.get(final_url, headers=headers, timeout=1, allow_redirects=False)
if redirect_count >= MAX_REDIRECTS:
logger.warning(f"Too many redirects for URL: {url}")
soup = BeautifulSoup(response.content, "html.parser")
title_tag = soup.find("title")
@ -60,8 +114,8 @@ def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]:
except requests.RequestException as e:
logger.warning(f"Failed to fetch HTML for title: {str(e)}")
# Fetch and encode favicon
favicon_base64 = fetch_and_encode_favicon(headers, soup, url)
# Fetch and encode favicon using final URL (after redirects)
favicon_base64 = fetch_and_encode_favicon(headers, soup, final_url)
# Prepare result
result = {
@ -107,7 +161,9 @@ def find_favicon_url(soup: Optional[BeautifulSoup], base_url: str) -> Optional[s
for selector in favicon_selectors:
favicon_tag = soup.select_one(selector)
if favicon_tag and favicon_tag.get("href"):
return urljoin(base_url, favicon_tag["href"])
favicon_href = urljoin(base_url, favicon_tag["href"])
validate_url_ip(favicon_href)
return favicon_href
# Fallback to /favicon.ico
parsed_url = urlparse(base_url)
@ -115,7 +171,9 @@ def find_favicon_url(soup: Optional[BeautifulSoup], base_url: str) -> Optional[s
# Check if fallback exists
try:
response = requests.head(fallback_url, timeout=2)
validate_url_ip(fallback_url)
response = requests.head(fallback_url, timeout=2, allow_redirects=False)
if response.status_code == 200:
return fallback_url
except requests.RequestException as e:
@ -146,6 +204,8 @@ def fetch_and_encode_favicon(
"favicon_base64": f"data:image/svg+xml;base64,{DEFAULT_FAVICON}",
}
validate_url_ip(favicon_url)
response = requests.get(favicon_url, headers=headers, timeout=1)
# Get content type