# Copyright (c) 2023-present Plane Software, Inc. and contributors # SPDX-License-Identifier: AGPL-3.0-only # See the LICENSE file for details. # Python imports import os import uuid # Third party imports import boto3 from botocore.exceptions import ClientError from urllib.parse import quote # Module imports from plane.utils.exception_logger import log_exception from storages.backends.s3boto3 import S3Boto3Storage class S3Storage(S3Boto3Storage): def url(self, name, parameters=None, expire=None, http_method=None): return name """S3 storage class to generate presigned URLs for S3 objects""" def __init__(self, request=None): # Get the AWS credentials and bucket name from the environment self.aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID") # Use the AWS_SECRET_ACCESS_KEY environment variable for the secret key self.aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY") # Use the AWS_S3_BUCKET_NAME environment variable for the bucket name self.aws_storage_bucket_name = os.environ.get("AWS_S3_BUCKET_NAME") # Use the AWS_REGION environment variable for the region self.aws_region = os.environ.get("AWS_REGION") # Use the AWS_S3_ENDPOINT_URL environment variable for the endpoint URL self.aws_s3_endpoint_url = os.environ.get("AWS_S3_ENDPOINT_URL") or os.environ.get("MINIO_ENDPOINT_URL") # Use the SIGNED_URL_EXPIRATION environment variable for the expiration time (default: 3600 seconds) self.signed_url_expiration = int(os.environ.get("SIGNED_URL_EXPIRATION", "3600")) if os.environ.get("USE_MINIO") == "1": # Determine protocol based on environment variable if os.environ.get("MINIO_ENDPOINT_SSL") == "1": endpoint_protocol = "https" else: endpoint_protocol = request.scheme if request else "http" # Create an S3 client for MinIO self.s3_client = boto3.client( "s3", aws_access_key_id=self.aws_access_key_id, aws_secret_access_key=self.aws_secret_access_key, region_name=self.aws_region, endpoint_url=(f"{endpoint_protocol}://{request.get_host()}" if request else self.aws_s3_endpoint_url), config=boto3.session.Config(signature_version="s3v4"), ) else: # Create an S3 client self.s3_client = boto3.client( "s3", aws_access_key_id=self.aws_access_key_id, aws_secret_access_key=self.aws_secret_access_key, region_name=self.aws_region, endpoint_url=self.aws_s3_endpoint_url, config=boto3.session.Config(signature_version="s3v4"), ) def generate_presigned_post(self, object_name, file_type, file_size, expiration=None): """Generate a presigned URL for browser-direct upload. BB-PATCH (binarybeachio fork): method name preserved for caller compat, but this now mints a presigned PUT URL — not POST. Why: Cloudflare R2 and Backblaze B2 — the two most common self-host S3-compatible backends — do NOT implement S3 PostObject. Both return HTTP 501 NotImplemented for the bucket-form POST verb that vanilla Plane uses. Confirmed empirically against both backends 2026-04-30. Rolling our own backend support isn't tractable; PUT is universally supported (R2, B2, AWS S3, MinIO, Wasabi, etc). Tradeoff: we lose signed enforcement of `content-length-range`. Size is still validated server-side at presign time via the `file_size` param (see callers: 413 raised before we get here), so a determined client could only over-upload by misreporting the size — they'd be capped by the bucket's max-file-size at worst. See docs/features/storage-upload-flow.md in the binarybeachio repo for the full decision record + rollback procedure (`git revert` this commit and rebuild the images). """ if expiration is None: expiration = self.signed_url_expiration # Default to application/octet-stream when caller passes empty/None. # The file-type library Plane's frontend uses returns "" for unsniffable # formats (plain text, .json, etc.), which would sign a presigned URL # with `Content-Type=""`. Browsers can't reliably send an empty # Content-Type header, so the SigV4 signature would never match and PUT # would 403. We resolve this by signing a definite default; the # frontend then sends the signed value verbatim (see helper.ts). signed_content_type = file_type or "application/octet-stream" try: url = self.s3_client.generate_presigned_url( "put_object", Params={ "Bucket": self.aws_storage_bucket_name, "Key": object_name, "ContentType": signed_content_type, }, ExpiresIn=expiration, HttpMethod="PUT", ) except ClientError as e: print(f"Error generating presigned PUT URL: {e}") return None return { "url": url, "method": "PUT", "fields": {"Content-Type": signed_content_type, "key": object_name}, } def _get_content_disposition(self, disposition, filename=None): """Helper method to generate Content-Disposition header value""" if filename is None: filename = uuid.uuid4().hex if filename: # Encode the filename to handle special characters encoded_filename = quote(filename) return f"{disposition}; filename*=UTF-8''{encoded_filename}" return disposition def generate_presigned_url( self, object_name, expiration=None, http_method="GET", disposition="inline", filename=None, ): """Generate a presigned URL to share an S3 object""" if expiration is None: expiration = self.signed_url_expiration content_disposition = self._get_content_disposition(disposition, filename) try: response = self.s3_client.generate_presigned_url( "get_object", Params={ "Bucket": self.aws_storage_bucket_name, "Key": str(object_name), "ResponseContentDisposition": content_disposition, }, ExpiresIn=expiration, HttpMethod=http_method, ) except ClientError as e: log_exception(e) return None # The response contains the presigned URL return response def get_object_metadata(self, object_name): """Get the metadata for an S3 object""" try: response = self.s3_client.head_object(Bucket=self.aws_storage_bucket_name, Key=object_name) except ClientError as e: log_exception(e) return None return { "ContentType": response.get("ContentType"), "ContentLength": response.get("ContentLength"), "LastModified": (response.get("LastModified").isoformat() if response.get("LastModified") else None), "ETag": response.get("ETag"), "Metadata": response.get("Metadata", {}), } def copy_object(self, object_name, new_object_name): """Copy an S3 object to a new location""" try: response = self.s3_client.copy_object( Bucket=self.aws_storage_bucket_name, CopySource={"Bucket": self.aws_storage_bucket_name, "Key": object_name}, Key=new_object_name, ) except ClientError as e: log_exception(e) return None return response def upload_file( self, file_obj, object_name: str, content_type: str = None, extra_args: dict = {}, ) -> bool: """Upload a file directly to S3""" try: if content_type: extra_args["ContentType"] = content_type self.s3_client.upload_fileobj( file_obj, self.aws_storage_bucket_name, object_name, ExtraArgs=extra_args, ) return True except ClientError as e: log_exception(e) return False def delete_files(self, object_names): """Delete an S3 object""" try: self.s3_client.delete_objects( Bucket=self.aws_storage_bucket_name, Delete={"Objects": [{"Key": object_name} for object_name in object_names]}, ) return True except ClientError as e: log_exception(e) return False