# Python imports import logging import time # Django imports from django.http import HttpRequest # Third party imports from rest_framework.request import Request # Module imports from plane.utils.ip_address import get_client_ip from plane.db.models import APIActivityLog api_logger = logging.getLogger("plane.api.request") class RequestLoggerMiddleware: def __init__(self, get_response): self.get_response = get_response def _should_log_route(self, request: Request | HttpRequest) -> bool: """ Determines whether a route should be logged based on the request and status code. """ # Don't log health checks if request.path == "/" and request.method == "GET": return False return True def __call__(self, request): # get the start time start_time = time.time() # Get the response response = self.get_response(request) # calculate the duration duration = time.time() - start_time # Check if logging is required log_true = self._should_log_route(request=request) # If logging is not required, return the response if not log_true: return response user_id = ( request.user.id if getattr(request, "user") and getattr(request.user, "is_authenticated", False) else None ) user_agent = request.META.get("HTTP_USER_AGENT", "") # Log the request information api_logger.info( f"{request.method} {request.get_full_path()} {response.status_code}", extra={ "path": request.path, "method": request.method, "status_code": response.status_code, "duration_ms": int(duration * 1000), "remote_addr": get_client_ip(request), "user_agent": user_agent, "user_id": user_id, }, ) # return the response return response class APITokenLogMiddleware: def __init__(self, get_response): self.get_response = get_response def __call__(self, request): request_body = request.body response = self.get_response(request) self.process_request(request, response, request_body) return response def _safe_decode_body(self, content): """ Safely decodes request/response body content, handling binary data. Returns None if content is None, or a string representation of the content. """ # If the content is None, return None if content is None: return None # If the content is an empty bytes object, return None if content == b"": return None # Check if content is binary by looking for common binary file signatures if ( content.startswith(b"\x89PNG") or content.startswith(b"\xff\xd8\xff") or content.startswith(b"%PDF") ): return "[Binary Content]" try: return content.decode("utf-8") except UnicodeDecodeError: return "[Could not decode content]" def process_request(self, request, response, request_body): api_key_header = "X-Api-Key" api_key = request.headers.get(api_key_header) # If the API key is present, log the request if api_key: try: APIActivityLog.objects.create( token_identifier=api_key, path=request.path, method=request.method, query_params=request.META.get("QUERY_STRING", ""), headers=str(request.headers), body=( self._safe_decode_body(request_body) if request_body else None ), response_body=( self._safe_decode_body(response.content) if response.content else None ), response_code=response.status_code, ip_address=get_client_ip(request=request), user_agent=request.META.get("HTTP_USER_AGENT", None), ) except Exception as e: api_logger.exception(e) # If the token does not exist, you can decide whether to log this as an invalid attempt return None