diff --git a/apps/api/plane/bgtasks/work_item_link_task.py b/apps/api/plane/bgtasks/work_item_link_task.py index 7ceaacaf5..442396c7f 100644 --- a/apps/api/plane/bgtasks/work_item_link_task.py +++ b/apps/api/plane/bgtasks/work_item_link_task.py @@ -1,6 +1,10 @@ +# Copyright (c) 2023-present Plane Software, Inc. and contributors +# SPDX-License-Identifier: AGPL-3.0-only +# See the LICENSE file for details. + # Python imports import logging - +import socket # Third party imports from celery import shared_task @@ -20,6 +24,48 @@ logger = logging.getLogger("plane.worker") DEFAULT_FAVICON = "PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoPSIyNCIgaGVpZ2h0PSIyNCIgdmlld0JveD0iMCAwIDI0IDI0IiBmaWxsPSJub25lIiBzdHJva2U9ImN1cnJlbnRDb2xvciIgc3Ryb2tlLXdpZHRoPSIyIiBzdHJva2UtbGluZWNhcD0icm91bmQiIHN0cm9rZS1saW5lam9pbj0icm91bmQiIGNsYXNzPSJsdWNpZGUgbHVjaWRlLWxpbmstaWNvbiBsdWNpZGUtbGluayI+PHBhdGggZD0iTTEwIDEzYTUgNSAwIDAgMCA3LjU0LjU0bDMtM2E1IDUgMCAwIDAtNy4wNy03LjA3bC0xLjcyIDEuNzEiLz48cGF0aCBkPSJNMTQgMTFhNSA1IDAgMCAwLTcuNTQtLjU0bC0zIDNhNSA1IDAgMCAwIDcuMDcgNy4wN2wxLjcxLTEuNzEiLz48L3N2Zz4=" # noqa: E501 +def validate_url_ip(url: str) -> None: + """ + Validate that a URL doesn't point to a private/internal IP address. + Resolves hostnames to IPs before checking. + + Args: + url: The URL to validate + + Raises: + ValueError: If the URL points to a private/internal IP + """ + parsed = urlparse(url) + hostname = parsed.hostname + + if not hostname: + raise ValueError("Invalid URL: No hostname found") + + # Only allow HTTP and HTTPS to prevent file://, gopher://, etc. + if parsed.scheme not in ("http", "https"): + raise ValueError("Invalid URL scheme. Only HTTP and HTTPS are allowed") + + # Resolve hostname to IP addresses — this catches domain names that + # point to internal IPs (e.g. attacker.com -> 169.254.169.254) + + try: + addr_info = socket.getaddrinfo(hostname, None) + except socket.gaierror: + raise ValueError("Hostname could not be resolved") + + if not addr_info: + raise ValueError("No IP addresses found for the hostname") + + # Check every resolved IP against blocked ranges to prevent SSRF + for addr in addr_info: + ip = ipaddress.ip_address(addr[4][0]) + if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local: + raise ValueError("Access to private/internal networks is not allowed") + + +MAX_REDIRECTS = 5 + + def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]: """ Crawls a URL to extract the title and favicon. @@ -31,17 +77,6 @@ def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]: str: JSON string containing title and base64-encoded favicon """ try: - # Prevent access to private IP ranges - parsed = urlparse(url) - - try: - ip = ipaddress.ip_address(parsed.hostname) - if ip.is_private or ip.is_loopback or ip.is_reserved: - raise ValueError("Access to private/internal networks is not allowed") - except ValueError: - # Not an IP address, continue with domain validation - pass - # Set up headers to mimic a real browser headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" # noqa: E501 @@ -49,9 +84,28 @@ def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]: soup = None title = None + final_url = url + + validate_url_ip(final_url) try: - response = requests.get(url, headers=headers, timeout=1) + # Manually follow redirects to validate each URL before requesting + redirect_count = 0 + response = requests.get(final_url, headers=headers, timeout=1, allow_redirects=False) + + while response.is_redirect and redirect_count < MAX_REDIRECTS: + redirect_url = response.headers.get("Location") + if not redirect_url: + break + # Resolve relative redirects against current URL + final_url = urljoin(final_url, redirect_url) + # Validate the redirect target BEFORE making the request + validate_url_ip(final_url) + redirect_count += 1 + response = requests.get(final_url, headers=headers, timeout=1, allow_redirects=False) + + if redirect_count >= MAX_REDIRECTS: + logger.warning(f"Too many redirects for URL: {url}") soup = BeautifulSoup(response.content, "html.parser") title_tag = soup.find("title") @@ -60,8 +114,8 @@ def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]: except requests.RequestException as e: logger.warning(f"Failed to fetch HTML for title: {str(e)}") - # Fetch and encode favicon - favicon_base64 = fetch_and_encode_favicon(headers, soup, url) + # Fetch and encode favicon using final URL (after redirects) + favicon_base64 = fetch_and_encode_favicon(headers, soup, final_url) # Prepare result result = { @@ -107,7 +161,9 @@ def find_favicon_url(soup: Optional[BeautifulSoup], base_url: str) -> Optional[s for selector in favicon_selectors: favicon_tag = soup.select_one(selector) if favicon_tag and favicon_tag.get("href"): - return urljoin(base_url, favicon_tag["href"]) + favicon_href = urljoin(base_url, favicon_tag["href"]) + validate_url_ip(favicon_href) + return favicon_href # Fallback to /favicon.ico parsed_url = urlparse(base_url) @@ -115,7 +171,9 @@ def find_favicon_url(soup: Optional[BeautifulSoup], base_url: str) -> Optional[s # Check if fallback exists try: - response = requests.head(fallback_url, timeout=2) + validate_url_ip(fallback_url) + response = requests.head(fallback_url, timeout=2, allow_redirects=False) + if response.status_code == 200: return fallback_url except requests.RequestException as e: @@ -146,6 +204,8 @@ def fetch_and_encode_favicon( "favicon_base64": f"data:image/svg+xml;base64,{DEFAULT_FAVICON}", } + validate_url_ip(favicon_url) + response = requests.get(favicon_url, headers=headers, timeout=1) # Get content type