# Copyright (c) 2023-present Plane Software, Inc. and contributors # SPDX-License-Identifier: AGPL-3.0-only # See the LICENSE file for details. # # binarybeachio fork addition — see BINARYBEACHIO.md at repo root. # # Bucket-4 trusted-JWT entry-point. Validates a short-lived RS256 JWT signed # by the binarybeachio auth-bridge (private key BRIDGE_SIGNING_KEY), enforces # single-use replay protection via shared-redis SETNX (per the contract in # `binarybeachio/docs/architecture/bridge-jwt-replay-protection.md`), then # finds-or-creates the corresponding User and starts a Django session via # the existing user_login() helper. # # Endpoint behavior when not configured: # - If BB_BRIDGE_PUBLIC_KEY_URL env is unset → 404 (endpoint disabled). # Vanilla upstream behavior is preserved out-of-the-box; the trusted-JWT # entry-point only exists in deployments that explicitly opt in. # # Public-key transport: # - Fetched at request time from BB_BRIDGE_PUBLIC_KEY_URL (typically # `http://auth-bridge-:3000/.well-known/bb-bridge.pub.pem`). # - Cached in-process for 5 minutes; auto-refreshed on signature failure # to handle bridge key rotation transparently. # - This sidesteps the env-PEM corruption issue: putting RSA PEMs through # Coolify's .env writer escapes backslashes (`\n` → `\\n`), which # corrupts the multi-line PEM. HTTP fetch never traverses that path. # See bb-activepieces-fork/.../trusted-jwt-verifier.ts module-doc for # the original write-up. # # Replay protection: # - Bridge mints with a UUIDv4 `jti` claim. # - This view atomically SETNX `bb_bridge_jti:` in shared-redis with # TTL = (exp - now) + 30s clock-skew tolerance. # - Fail closed: if Redis is unavailable, REJECT. Auth correctness > # auth availability; break-glass admin (email+password) covers operator # access during a Redis outage. import logging import os import time import uuid from typing import Optional, Tuple from urllib.parse import urlparse import jwt as pyjwt import redis import requests from django.db import transaction from django.http import HttpResponseRedirect, HttpResponseNotFound from django.views import View from plane.authentication.adapter.error import ( AUTHENTICATION_ERROR_CODES, AuthenticationException, ) from plane.authentication.utils.host import base_host from plane.authentication.utils.login import user_login from plane.authentication.utils.redirection_path import get_redirection_path from plane.authentication.utils.user_auth_workflow import post_user_auth_workflow from plane.db.models import Profile, User from plane.settings.redis import redis_instance from plane.utils.path_validator import get_safe_redirect_url log = logging.getLogger("plane.authentication.trusted") # Audience the bridge sets in JWTs minted for Plane (signBridgeJwt(..., audience: 'plane')). _EXPECTED_AUDIENCE = "plane" # Issuer the bridge sets (every adapter shares this). _EXPECTED_ISSUER = "bb-bridge" # Replay-store key prefix per bridge-jwt-replay-protection.md. _JTI_KEY_PREFIX = "bb_bridge_jti:" # Clock-skew tolerance applied to exp/iat checks. _CLOCK_SKEW_SECONDS = 30 # Public-key cache (in-process). Keyed on URL so test/dev with multiple # bridges per process is safe. _key_cache: {url: (pem, fetched_at_epoch)}. _KEY_CACHE_TTL_SECONDS = 5 * 60 _key_cache: dict[str, Tuple[str, float]] = {} def _bridge_public_key_url() -> Optional[str]: """Returns the configured bridge public-key URL, or None if disabled. The endpoint is implicitly disabled (returns 404) when this env is unset — the regression-safe default for builds shipped without the bridge wired up. """ return os.environ.get("BB_BRIDGE_PUBLIC_KEY_URL") or None def _fetch_bridge_public_key(url: str, *, force_refresh: bool = False) -> str: """Fetch (and cache) the bridge's public key PEM. Refetches on signature failure or after the cache TTL elapses. Falls back to stale cache if a refresh fails — temporarily-unreachable bridge shouldn't brick logins.""" now = time.time() cached = _key_cache.get(url) if not force_refresh and cached and (now - cached[1]) < _KEY_CACHE_TTL_SECONDS: return cached[0] try: resp = requests.get(url, timeout=3.0, headers={"accept": "application/x-pem-file"}) resp.raise_for_status() pem = resp.text if "-----BEGIN PUBLIC KEY-----" not in pem: raise ValueError(f"non-PEM body from {url} (first 80: {pem[:80]!r})") _key_cache[url] = (pem, now) return pem except Exception as exc: if cached: log.warning("bridge public-key fetch failed; using stale cache", extra={"url": url, "err": str(exc)}) return cached[0] raise def _consume_jti(jti: str, exp_epoch: int) -> Tuple[bool, Optional[str]]: """Atomically mark a `jti` consumed in shared-redis. Returns (first_use, error_code). - (True, None) → not previously consumed; admit the request. - (False, code) → either already consumed (TRUSTED_JWT_TOKEN_REPLAYED) or the replay store is unavailable (TRUSTED_JWT_REPLAY_STORE_DOWN). Either way, REJECT the request (fail closed). TTL = (exp - now) + 30s clock-skew tolerance, with a 30s minimum floor for edge cases where exp is already past at consumption time (signature still valid under clock-skew tolerance). """ if not jti or not exp_epoch: return False, "TRUSTED_JWT_TOKEN_INVALID" try: client = redis_instance() except Exception as exc: log.error("replay store init failed", extra={"err": str(exc)}) return False, "TRUSTED_JWT_REPLAY_STORE_DOWN" try: ttl = max(int(exp_epoch - time.time()) + _CLOCK_SKEW_SECONDS, 30) # SET key value NX EX ttl -- returns True on first-set, None if already set. ok = client.set(_JTI_KEY_PREFIX + jti, "1", nx=True, ex=ttl) if ok is None: return False, "TRUSTED_JWT_TOKEN_REPLAYED" return True, None except redis.RedisError as exc: log.error("replay store SETNX failed", extra={"err": str(exc), "jti": jti}) return False, "TRUSTED_JWT_REPLAY_STORE_DOWN" def _redirect_with_error(request, error_code: str, error_message: str, next_path: str) -> HttpResponseRedirect: """Surface the failure as a Plane-style redirect to the host with error params.""" exc = AuthenticationException( error_code=AUTHENTICATION_ERROR_CODES[error_code], error_message=error_message, ) return HttpResponseRedirect( get_safe_redirect_url( base_url=base_host(request=request, is_app=True), next_path=next_path, params=exc.get_error_dict(), ) ) def _verify_with_retry(token: str, public_key_url: str) -> dict: """Verify the JWT, refetching the bridge key once on signature failure to transparently handle bridge key rotation. Other verify failures (expired, wrong issuer/audience, malformed) do NOT trigger a refetch — those are tampering or clock issues, not key drift. `bb_mailbox` is intentionally NOT in the required-claims list. The bridge only emits it when Zitadel's `bb-claims` Action has propagated and the tenant's `mail_domain` org metadata is set; absent the claim, the view falls back to `email` for User keying so a partial deployment (bridge upgraded but Action not yet live) still works. See the four-layer model in `binarybeachio/docs/architecture/multi-tenant-identity.md` §4.""" pem = _fetch_bridge_public_key(public_key_url) try: return pyjwt.decode( token, pem, algorithms=["RS256"], audience=_EXPECTED_AUDIENCE, issuer=_EXPECTED_ISSUER, leeway=_CLOCK_SKEW_SECONDS, options={"require": ["exp", "iat", "sub", "email", "jti"]}, ) except pyjwt.InvalidSignatureError: log.warning("trusted-jwt signature failed; refetching bridge key", extra={"url": public_key_url}) pem = _fetch_bridge_public_key(public_key_url, force_refresh=True) return pyjwt.decode( token, pem, algorithms=["RS256"], audience=_EXPECTED_AUDIENCE, issuer=_EXPECTED_ISSUER, leeway=_CLOCK_SKEW_SECONDS, options={"require": ["exp", "iat", "sub", "email", "jti"]}, ) class TrustedSignInEndpoint(View): """GET /auth/sign-in-trusted/?token=&next_path= The bridge 302s the browser here after a successful oauth2-proxy session is established. We verify the JWT, claim its `jti` to prevent replay, find-or-create the User, and call user_login() to set the Django session cookie. Then 302 the user to next_path on the same host. """ def get(self, request): public_key_url = _bridge_public_key_url() if not public_key_url: # Endpoint disabled — bridge not wired up in this deployment. return HttpResponseNotFound() # Validate next_path on every exit — even error redirects honor it so # the user lands somewhere sensible. get_safe_redirect_url further # constrains to the trusted base host. next_path = request.GET.get("next_path") or "/" token = request.GET.get("token") if not token: return _redirect_with_error(request, "TRUSTED_JWT_TOKEN_MISSING", "TRUSTED_JWT_TOKEN_MISSING", next_path) try: claims = _verify_with_retry(token, public_key_url) except pyjwt.ExpiredSignatureError: return _redirect_with_error(request, "TRUSTED_JWT_TOKEN_EXPIRED", "TRUSTED_JWT_TOKEN_EXPIRED", next_path) except pyjwt.InvalidTokenError as e: log.warning("trusted-jwt invalid", extra={"err_class": e.__class__.__name__}) return _redirect_with_error(request, "TRUSTED_JWT_TOKEN_INVALID", f"TRUSTED_JWT_TOKEN_INVALID: {e.__class__.__name__}", next_path) except Exception as e: log.error("trusted-jwt key fetch failed", extra={"err": str(e)}) return _redirect_with_error(request, "TRUSTED_JWT_KEY_FETCH_FAILED", "TRUSTED_JWT_KEY_FETCH_FAILED", next_path) # Replay enforcement — atomic SETNX in shared-redis. Fail closed. first_use, replay_err = _consume_jti(claims.get("jti", ""), int(claims.get("exp", 0))) if not first_use: log.warning( "trusted-jwt rejected by replay-store", extra={"jti": claims.get("jti"), "sub": claims.get("sub"), "code": replay_err}, ) return _redirect_with_error(request, replay_err or "TRUSTED_JWT_TOKEN_REPLAYED", replay_err or "TRUSTED_JWT_TOKEN_REPLAYED", next_path) email = (claims.get("email") or "").strip().lower() if not email: return _redirect_with_error(request, "TRUSTED_JWT_TOKEN_INVALID", "TRUSTED_JWT_TOKEN_NO_EMAIL", next_path) # Identity-model rollout (multi-tenant-identity.md §4): prefer # `bb_mailbox` over `email` for User keying. `email` carries the # federation address (e.g., the user's Google login) — it's useful # for auditing but it is NOT the canonical per-tenant identity. The # canonical identity is `bb_mailbox` = `@`, # the address Stalwart actually hosts and the address the operator # invites users at. # # If `bb_mailbox` is absent, fall back to `email` so a transitional # deployment (bridge already upgraded but Zitadel `bb-claims` Action # not yet propagated) keeps working. The WARN log is the operator's # signal that the chain is still incomplete. bb_mailbox = (claims.get("bb_mailbox") or "").strip().lower() if bb_mailbox: lookup_email = bb_mailbox else: log.warning( "trusted-jwt missing bb_mailbox claim; falling back to federation email — " "verify Zitadel `bb-claims` Action is published and the tenant's " "`mail_domain` org metadata is set", extra={"sub": claims.get("sub"), "email": email, "tenant": claims.get("tenant")}, ) lookup_email = email # Find-or-create. We mirror the User-creation shape that # OauthAdapter.complete_login_or_signup() produces (apps/api/plane/ # authentication/adapter/base.py:289-342) — same field set, same # required side-effect of creating a Profile row. Skipping the Profile # would cause Plane's SPA /api/users/me/profile/ to 404 and bounce # the user back to /login in an onboarding loop. user = User.objects.filter(email=lookup_email).first() created = user is None if created: with transaction.atomic(): user = User( email=lookup_email, username=uuid.uuid4().hex, first_name=claims.get("first_name") or claims.get("given_name") or "", last_name=claims.get("last_name") or claims.get("family_name") or "", is_password_autoset=True, is_email_verified=True, ) # Random password — user signs in via SSO; the password exists # only so Django's auth machinery has a non-empty hash and the # break-glass admin pattern can be applied later by ALTER-ing # this user out of band if needed. user.set_password(uuid.uuid4().hex) user.save() # Profile row is mandatory: every Plane API endpoint that # touches user state (workspace listings, onboarding) reads # Profile, and the SPA's /api/users/me/profile/ 404s without it. Profile.objects.create(user=user) # Plane's existing post-auth workflow (default workspace, invitations, etc.) post_user_auth_workflow(user=user, is_signup=created, request=request) # Set Django session cookie via the existing helper. user_login(request=request, user=user, is_app=True) # NOTE: do NOT name extra keys after LogRecord built-in attributes # (`name`, `created`, `levelname`, `module`, `message`, etc.) — # Logger.makeRecord raises KeyError("Attempt to overwrite %r in LogRecord") # on collision. Use is_signup instead of created. log.info( "trusted-jwt sign-in", extra={ "jti": claims.get("jti"), "sub": claims.get("sub"), "email": email, "lookup_email": lookup_email, "bb_mailbox_present": bool(bb_mailbox), "tenant": claims.get("tenant"), "is_signup": created, }, ) target = next_path or get_redirection_path(user=user) response = HttpResponseRedirect( get_safe_redirect_url( base_url=base_host(request=request, is_app=True), next_path=target, params={}, ) ) # Per-app edge-identity validation — set the marker cookie at the same # choke-point that mints the Django session. The middleware in # plane/middleware/bb_edge_identity.py compares this cookie to # `X-Auth-Request-User` on every authenticated request and flushes the # session on mismatch. See # binarybeachio/docs/conventions/per-app-edge-identity-validation.md. edge_sub = request.META.get("HTTP_X_AUTH_REQUEST_USER", "") if edge_sub: response.set_cookie( "_bb_edge_sub", edge_sub, httponly=True, secure=True, samesite="Lax", path="/", ) return response