[SECUR-113] fix: ssrf for work item links (#8607)
This commit is contained in:
parent
587cb3ecfe
commit
d191615a5e
1 changed files with 48 additions and 15 deletions
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
# Python imports
|
||||
import logging
|
||||
import socket
|
||||
|
||||
# Third party imports
|
||||
from celery import shared_task
|
||||
|
|
@ -26,7 +27,7 @@ DEFAULT_FAVICON = "PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoP
|
|||
def validate_url_ip(url: str) -> None:
|
||||
"""
|
||||
Validate that a URL doesn't point to a private/internal IP address.
|
||||
Only checks if the hostname is a direct IP address.
|
||||
Resolves hostnames to IPs before checking.
|
||||
|
||||
Args:
|
||||
url: The URL to validate
|
||||
|
|
@ -38,17 +39,31 @@ def validate_url_ip(url: str) -> None:
|
|||
hostname = parsed.hostname
|
||||
|
||||
if not hostname:
|
||||
return
|
||||
raise ValueError("Invalid URL: No hostname found")
|
||||
|
||||
# Only allow HTTP and HTTPS to prevent file://, gopher://, etc.
|
||||
if parsed.scheme not in ("http", "https"):
|
||||
raise ValueError("Invalid URL scheme. Only HTTP and HTTPS are allowed")
|
||||
|
||||
# Resolve hostname to IP addresses — this catches domain names that
|
||||
# point to internal IPs (e.g. attacker.com -> 169.254.169.254)
|
||||
|
||||
try:
|
||||
ip = ipaddress.ip_address(hostname)
|
||||
except ValueError:
|
||||
# Not an IP address (it's a domain name), nothing to check here
|
||||
return
|
||||
addr_info = socket.getaddrinfo(hostname, None)
|
||||
except socket.gaierror:
|
||||
raise ValueError("Hostname could not be resolved")
|
||||
|
||||
# It IS an IP address - check if it's private/internal
|
||||
if ip.is_private or ip.is_loopback or ip.is_reserved:
|
||||
raise ValueError("Access to private/internal networks is not allowed")
|
||||
if not addr_info:
|
||||
raise ValueError("No IP addresses found for the hostname")
|
||||
|
||||
# Check every resolved IP against blocked ranges to prevent SSRF
|
||||
for addr in addr_info:
|
||||
ip = ipaddress.ip_address(addr[4][0])
|
||||
if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local:
|
||||
raise ValueError("Access to private/internal networks is not allowed")
|
||||
|
||||
|
||||
MAX_REDIRECTS = 5
|
||||
|
||||
|
||||
def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]:
|
||||
|
|
@ -74,11 +89,23 @@ def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]:
|
|||
validate_url_ip(final_url)
|
||||
|
||||
try:
|
||||
response = requests.get(final_url, headers=headers, timeout=1)
|
||||
final_url = response.url # Get the final URL after any redirects
|
||||
# Manually follow redirects to validate each URL before requesting
|
||||
redirect_count = 0
|
||||
response = requests.get(final_url, headers=headers, timeout=1, allow_redirects=False)
|
||||
|
||||
# check for redirected url also
|
||||
validate_url_ip(final_url)
|
||||
while response.is_redirect and redirect_count < MAX_REDIRECTS:
|
||||
redirect_url = response.headers.get("Location")
|
||||
if not redirect_url:
|
||||
break
|
||||
# Resolve relative redirects against current URL
|
||||
final_url = urljoin(final_url, redirect_url)
|
||||
# Validate the redirect target BEFORE making the request
|
||||
validate_url_ip(final_url)
|
||||
redirect_count += 1
|
||||
response = requests.get(final_url, headers=headers, timeout=1, allow_redirects=False)
|
||||
|
||||
if redirect_count >= MAX_REDIRECTS:
|
||||
logger.warning(f"Too many redirects for URL: {url}")
|
||||
|
||||
soup = BeautifulSoup(response.content, "html.parser")
|
||||
title_tag = soup.find("title")
|
||||
|
|
@ -134,7 +161,9 @@ def find_favicon_url(soup: Optional[BeautifulSoup], base_url: str) -> Optional[s
|
|||
for selector in favicon_selectors:
|
||||
favicon_tag = soup.select_one(selector)
|
||||
if favicon_tag and favicon_tag.get("href"):
|
||||
return urljoin(base_url, favicon_tag["href"])
|
||||
favicon_href = urljoin(base_url, favicon_tag["href"])
|
||||
validate_url_ip(favicon_href)
|
||||
return favicon_href
|
||||
|
||||
# Fallback to /favicon.ico
|
||||
parsed_url = urlparse(base_url)
|
||||
|
|
@ -142,7 +171,9 @@ def find_favicon_url(soup: Optional[BeautifulSoup], base_url: str) -> Optional[s
|
|||
|
||||
# Check if fallback exists
|
||||
try:
|
||||
response = requests.head(fallback_url, timeout=2)
|
||||
validate_url_ip(fallback_url)
|
||||
response = requests.head(fallback_url, timeout=2, allow_redirects=False)
|
||||
|
||||
if response.status_code == 200:
|
||||
return fallback_url
|
||||
except requests.RequestException as e:
|
||||
|
|
@ -173,6 +204,8 @@ def fetch_and_encode_favicon(
|
|||
"favicon_base64": f"data:image/svg+xml;base64,{DEFAULT_FAVICON}",
|
||||
}
|
||||
|
||||
validate_url_ip(favicon_url)
|
||||
|
||||
response = requests.get(favicon_url, headers=headers, timeout=1)
|
||||
|
||||
# Get content type
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue