fix: validate redirects in favicon fetching to prevent SSRF (#8858)

* fix: validate redirects in favicon fetching to prevent SSRF

The previous SSRF fix (GHSA-jcc6-f9v6-f7jw) only validated redirects for
the main page URL but not for the favicon fetch path. An attacker could
craft an HTML page with a favicon link that redirects to a private IP,
bypassing the IP validation and leaking internal network data as base64.

Extract a reusable `safe_get()` function that validates every redirect hop
against private/internal IPs and use it for both page and favicon fetches.

Resolves: GHSA-9fr2-pprw-pp9j

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: address PR review feedback for SSRF favicon fix

- Fix off-by-one in redirect limit: only raise RuntimeError when the
  response is still a redirect after MAX_REDIRECTS hops, not when the
  final response is a successful 200
- Return final URL from safe_get() so favicon href resolution uses the
  correct origin after redirects instead of the original URL
- Add unit tests for validate_url_ip and safe_get covering private IP
  blocking, redirect-following, and redirect limit enforcement

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
sriram veeraghanta 2026-04-06 16:04:43 +05:30 committed by GitHub
parent 587fe76032
commit 63fac3b8c4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 178 additions and 24 deletions

View file

@ -13,7 +13,7 @@ from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
import base64
import ipaddress
from typing import Dict, Any
from typing import Dict, Any, Tuple
from typing import Optional
from plane.db.models import IssueLink
from plane.utils.exception_logger import log_exception
@ -66,6 +66,52 @@ def validate_url_ip(url: str) -> None:
MAX_REDIRECTS = 5
def safe_get(
url: str,
headers: Optional[Dict[str, str]] = None,
timeout: int = 1,
) -> Tuple[requests.Response, str]:
"""
Perform a GET request that validates every redirect hop against private IPs.
Prevents SSRF by ensuring no redirect lands on a private/internal address.
Args:
url: The URL to fetch
headers: Optional request headers
timeout: Request timeout in seconds
Returns:
A tuple of (final Response object, final URL after redirects)
Raises:
ValueError: If any URL in the redirect chain points to a private IP
requests.RequestException: On network errors
RuntimeError: If max redirects exceeded
"""
validate_url_ip(url)
current_url = url
response = requests.get(
current_url, headers=headers, timeout=timeout, allow_redirects=False
)
redirect_count = 0
while response.is_redirect:
if redirect_count >= MAX_REDIRECTS:
raise RuntimeError(f"Too many redirects for URL: {url}")
redirect_url = response.headers.get("Location")
if not redirect_url:
break
current_url = urljoin(current_url, redirect_url)
validate_url_ip(current_url)
redirect_count += 1
response = requests.get(
current_url, headers=headers, timeout=timeout, allow_redirects=False
)
return response, current_url
def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]:
"""
Crawls a URL to extract the title and favicon.
@ -86,26 +132,8 @@ def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]:
title = None
final_url = url
validate_url_ip(final_url)
try:
# Manually follow redirects to validate each URL before requesting
redirect_count = 0
response = requests.get(final_url, headers=headers, timeout=1, allow_redirects=False)
while response.is_redirect and redirect_count < MAX_REDIRECTS:
redirect_url = response.headers.get("Location")
if not redirect_url:
break
# Resolve relative redirects against current URL
final_url = urljoin(final_url, redirect_url)
# Validate the redirect target BEFORE making the request
validate_url_ip(final_url)
redirect_count += 1
response = requests.get(final_url, headers=headers, timeout=1, allow_redirects=False)
if redirect_count >= MAX_REDIRECTS:
logger.warning(f"Too many redirects for URL: {url}")
response, final_url = safe_get(url, headers=headers)
soup = BeautifulSoup(response.content, "html.parser")
title_tag = soup.find("title")
@ -113,8 +141,10 @@ def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]:
except requests.RequestException as e:
logger.warning(f"Failed to fetch HTML for title: {str(e)}")
except (ValueError, RuntimeError) as e:
logger.warning(f"URL validation failed: {str(e)}")
# Fetch and encode favicon using final URL (after redirects)
# Fetch and encode favicon using final URL (after redirects) for correct relative href resolution
favicon_base64 = fetch_and_encode_favicon(headers, soup, final_url)
# Prepare result
@ -204,9 +234,7 @@ def fetch_and_encode_favicon(
"favicon_base64": f"data:image/svg+xml;base64,{DEFAULT_FAVICON}",
}
validate_url_ip(favicon_url)
response = requests.get(favicon_url, headers=headers, timeout=1)
response, _ = safe_get(favicon_url, headers=headers)
# Get content type
content_type = response.headers.get("content-type", "image/x-icon")

View file

@ -0,0 +1,126 @@
# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.
import pytest
from unittest.mock import patch, MagicMock
from plane.bgtasks.work_item_link_task import safe_get, validate_url_ip
def _make_response(status_code=200, headers=None, is_redirect=False, content=b""):
"""Create a mock requests.Response."""
resp = MagicMock()
resp.status_code = status_code
resp.is_redirect = is_redirect
resp.headers = headers or {}
resp.content = content
return resp
@pytest.mark.unit
class TestValidateUrlIp:
"""Test validate_url_ip blocks private/internal IPs."""
def test_rejects_private_ip(self):
with patch("plane.bgtasks.work_item_link_task.socket.getaddrinfo") as mock_dns:
mock_dns.return_value = [(None, None, None, None, ("192.168.1.1", 0))]
with pytest.raises(ValueError, match="private/internal"):
validate_url_ip("http://example.com")
def test_rejects_loopback(self):
with patch("plane.bgtasks.work_item_link_task.socket.getaddrinfo") as mock_dns:
mock_dns.return_value = [(None, None, None, None, ("127.0.0.1", 0))]
with pytest.raises(ValueError, match="private/internal"):
validate_url_ip("http://example.com")
def test_rejects_non_http_scheme(self):
with pytest.raises(ValueError, match="Only HTTP and HTTPS"):
validate_url_ip("file:///etc/passwd")
def test_allows_public_ip(self):
with patch("plane.bgtasks.work_item_link_task.socket.getaddrinfo") as mock_dns:
mock_dns.return_value = [(None, None, None, None, ("93.184.216.34", 0))]
validate_url_ip("https://example.com") # Should not raise
@pytest.mark.unit
class TestSafeGet:
"""Test safe_get follows redirects safely and blocks SSRF."""
@patch("plane.bgtasks.work_item_link_task.requests.get")
@patch("plane.bgtasks.work_item_link_task.validate_url_ip")
def test_returns_response_for_non_redirect(self, mock_validate, mock_get):
final_resp = _make_response(status_code=200, content=b"OK")
mock_get.return_value = final_resp
response, final_url = safe_get("https://example.com")
assert response is final_resp
assert final_url == "https://example.com"
mock_validate.assert_called_once_with("https://example.com")
@patch("plane.bgtasks.work_item_link_task.requests.get")
@patch("plane.bgtasks.work_item_link_task.validate_url_ip")
def test_follows_redirect_and_validates_each_hop(self, mock_validate, mock_get):
redirect_resp = _make_response(
status_code=301,
is_redirect=True,
headers={"Location": "https://other.com/page"},
)
final_resp = _make_response(status_code=200, content=b"OK")
mock_get.side_effect = [redirect_resp, final_resp]
response, final_url = safe_get("https://example.com")
assert response is final_resp
assert final_url == "https://other.com/page"
# Should validate both the initial URL and the redirect target
assert mock_validate.call_count == 2
mock_validate.assert_any_call("https://example.com")
mock_validate.assert_any_call("https://other.com/page")
@patch("plane.bgtasks.work_item_link_task.requests.get")
@patch("plane.bgtasks.work_item_link_task.validate_url_ip")
def test_blocks_redirect_to_private_ip(self, mock_validate, mock_get):
redirect_resp = _make_response(
status_code=302,
is_redirect=True,
headers={"Location": "http://192.168.1.1:8080"},
)
mock_get.return_value = redirect_resp
# First call (initial URL) succeeds, second call (redirect target) fails
mock_validate.side_effect = [None, ValueError("Access to private/internal networks is not allowed")]
with pytest.raises(ValueError, match="private/internal"):
safe_get("https://evil.com/redirect")
@patch("plane.bgtasks.work_item_link_task.requests.get")
@patch("plane.bgtasks.work_item_link_task.validate_url_ip")
def test_raises_on_too_many_redirects(self, mock_validate, mock_get):
redirect_resp = _make_response(
status_code=302,
is_redirect=True,
headers={"Location": "https://example.com/loop"},
)
mock_get.return_value = redirect_resp
with pytest.raises(RuntimeError, match="Too many redirects"):
safe_get("https://example.com/start")
@patch("plane.bgtasks.work_item_link_task.requests.get")
@patch("plane.bgtasks.work_item_link_task.validate_url_ip")
def test_succeeds_at_exact_max_redirects(self, mock_validate, mock_get):
"""After exactly MAX_REDIRECTS hops, if the final response is 200, it should succeed."""
redirect_resp = _make_response(
status_code=302,
is_redirect=True,
headers={"Location": "https://example.com/next"},
)
final_resp = _make_response(status_code=200, content=b"OK")
# 5 redirects then a 200
mock_get.side_effect = [redirect_resp] * 5 + [final_resp]
response, final_url = safe_get("https://example.com/start")
assert response is final_resp
assert not response.is_redirect