diff --git a/apiserver/plane/bgtasks/issue_activities_task.py b/apiserver/plane/bgtasks/issue_activities_task.py index fcd75f8e3..4def8e8ca 100644 --- a/apiserver/plane/bgtasks/issue_activities_task.py +++ b/apiserver/plane/bgtasks/issue_activities_task.py @@ -1650,40 +1650,6 @@ def issue_activity( # Save all the values to database issue_activities_created = IssueActivity.objects.bulk_create(issue_activities) - # Post the updates to segway for integrations and webhooks - if len(issue_activities_created): - for activity in issue_activities_created: - webhook_activity.delay( - event=( - "issue_comment" - if activity.field == "comment" - else "intake_issue" - if intake - else "issue" - ), - event_id=( - activity.issue_comment_id - if activity.field == "comment" - else intake - if intake - else activity.issue_id - ), - verb=activity.verb, - field=( - "description" if activity.field == "comment" else activity.field - ), - old_value=( - activity.old_value if activity.old_value != "" else None - ), - new_value=( - activity.new_value if activity.new_value != "" else None - ), - actor_id=activity.actor_id, - current_site=origin, - slug=activity.workspace.slug, - old_identifier=activity.old_identifier, - new_identifier=activity.new_identifier, - ) if notification: notifications.delay( diff --git a/apiserver/plane/bgtasks/webhook_task.py b/apiserver/plane/bgtasks/webhook_task.py index c1ea01a4d..0bcfd2693 100644 --- a/apiserver/plane/bgtasks/webhook_task.py +++ b/apiserver/plane/bgtasks/webhook_task.py @@ -5,6 +5,7 @@ import logging import uuid import requests +from typing import Any, Dict, List, Optional, Union # Third party imports from celery import shared_task @@ -70,150 +71,89 @@ MODEL_MAPPER = { } -def get_model_data(event, event_id, many=False): +logger = logging.getLogger("plane.worker") + + +def get_model_data( + event: str, event_id: Union[str, List[str]], many: bool = False +) -> Dict[str, Any]: + """ + Retrieve and serialize model data based on the event type. + + Args: + event (str): The type of event/model to retrieve data for + event_id (Union[str, List[str]]): The ID or list of IDs of the model instance(s) + many (bool): Whether to retrieve multiple instances + + Returns: + Dict[str, Any]: Serialized model data + + Raises: + ValueError: If serializer is not found for the event + ObjectDoesNotExist: If model instance is not found + """ model = MODEL_MAPPER.get(event) - if many: - queryset = model.objects.filter(pk__in=event_id) - else: - queryset = model.objects.get(pk=event_id) - serializer = SERIALIZER_MAPPER.get(event) - return serializer(queryset, many=many).data + if model is None: + raise ValueError(f"Model not found for event: {event}") - -@shared_task( - bind=True, - autoretry_for=(requests.RequestException,), - retry_backoff=600, - max_retries=5, - retry_jitter=True, -) -def webhook_task(self, webhook, slug, event, event_data, action, current_site): try: - webhook = Webhook.objects.get(id=webhook, workspace__slug=slug) + if many: + queryset = model.objects.filter(pk__in=event_id) + else: + queryset = model.objects.get(pk=event_id) - headers = { - "Content-Type": "application/json", - "User-Agent": "Autopilot", - "X-Plane-Delivery": str(uuid.uuid4()), - "X-Plane-Event": event, - } + serializer = SERIALIZER_MAPPER.get(event) + if serializer is None: + raise ValueError(f"Serializer not found for event: {event}") - # # Your secret key - event_data = ( - json.loads(json.dumps(event_data, cls=DjangoJSONEncoder)) - if event_data is not None - else None - ) - - action = { - "POST": "create", - "PATCH": "update", - "PUT": "update", - "DELETE": "delete", - }.get(action, action) - - payload = { - "event": event, - "action": action, - "webhook_id": str(webhook.id), - "workspace_id": str(webhook.workspace_id), - "data": event_data, - } - - # Use HMAC for generating signature - if webhook.secret_key: - hmac_signature = hmac.new( - webhook.secret_key.encode("utf-8"), - json.dumps(payload).encode("utf-8"), - hashlib.sha256, - ) - signature = hmac_signature.hexdigest() - headers["X-Plane-Signature"] = signature - - # Send the webhook event - response = requests.post(webhook.url, headers=headers, json=payload, timeout=30) - - # Log the webhook request - WebhookLog.objects.create( - workspace_id=str(webhook.workspace_id), - webhook=str(webhook.id), - event_type=str(event), - request_method=str(action), - request_headers=str(headers), - request_body=str(payload), - response_status=str(response.status_code), - response_headers=str(response.headers), - response_body=str(response.text), - retry_count=str(self.request.retries), - ) - - except Webhook.DoesNotExist: - return - except requests.RequestException as e: - # Log the failed webhook request - WebhookLog.objects.create( - workspace_id=str(webhook.workspace_id), - webhook=str(webhook.id), - event_type=str(event), - request_method=str(action), - request_headers=str(headers), - request_body=str(payload), - response_status=500, - response_headers="", - response_body=str(e), - retry_count=str(self.request.retries), - ) - # Retry logic - if self.request.retries >= self.max_retries: - Webhook.objects.filter(pk=webhook.id).update(is_active=False) - if webhook: - # send email for the deactivation of the webhook - send_webhook_deactivation_email( - webhook_id=webhook.id, - receiver_id=webhook.created_by_id, - reason=str(e), - current_site=current_site, - ) - return - raise requests.RequestException() - - except Exception as e: - if settings.DEBUG: - print(e) - log_exception(e) - return + return serializer(queryset, many=many).data + except ObjectDoesNotExist: + raise ObjectDoesNotExist(f"No {event} found with id: {event_id}") @shared_task -def send_webhook_deactivation_email(webhook_id, receiver_id, current_site, reason): - # Get email configurations - ( - EMAIL_HOST, - EMAIL_HOST_USER, - EMAIL_HOST_PASSWORD, - EMAIL_PORT, - EMAIL_USE_TLS, - EMAIL_USE_SSL, - EMAIL_FROM, - ) = get_email_configuration() - - receiver = User.objects.get(pk=receiver_id) - webhook = Webhook.objects.get(pk=webhook_id) - subject = "Webhook Deactivated" - message = f"Webhook {webhook.url} has been deactivated due to failed requests." - - # Send the mail - context = { - "email": receiver.email, - "message": message, - "webhook_url": f"{current_site}/{str(webhook.workspace.slug)}/settings/webhooks/{str(webhook.id)}", - } - html_content = render_to_string( - "emails/notifications/webhook-deactivate.html", context - ) - text_content = strip_tags(html_content) +def send_webhook_deactivation_email( + webhook_id: str, receiver_id: str, current_site: str, reason: str +) -> None: + """ + Send an email notification when a webhook is deactivated. + Args: + webhook_id (str): ID of the deactivated webhook + receiver_id (str): ID of the user to receive the notification + current_site (str): Current site URL + reason (str): Reason for webhook deactivation + """ try: + ( + EMAIL_HOST, + EMAIL_HOST_USER, + EMAIL_HOST_PASSWORD, + EMAIL_PORT, + EMAIL_USE_TLS, + EMAIL_USE_SSL, + EMAIL_FROM, + ) = get_email_configuration() + + receiver = User.objects.get(pk=receiver_id) + webhook = Webhook.objects.get(pk=webhook_id) + + # Get the webhook payload + subject = "Webhook Deactivated" + message = f"Webhook {webhook.url} has been deactivated due to failed requests." + + # Send the mail + context = { + "email": receiver.email, + "message": message, + "webhook_url": f"{current_site}/{str(webhook.workspace.slug)}/settings/webhooks/{str(webhook.id)}", + } + html_content = render_to_string( + "emails/notifications/webhook-deactivate.html", context + ) + text_content = strip_tags(html_content) + + # Set the email connection connection = get_connection( host=EMAIL_HOST, port=int(EMAIL_PORT), @@ -223,6 +163,7 @@ def send_webhook_deactivation_email(webhook_id, receiver_id, current_site, reaso use_ssl=EMAIL_USE_SSL == "1", ) + # Create the email message msg = EmailMultiAlternatives( subject=subject, body=text_content, @@ -232,11 +173,10 @@ def send_webhook_deactivation_email(webhook_id, receiver_id, current_site, reaso ) msg.attach_alternative(html_content, "text/html") msg.send() - logging.getLogger("plane").info("Email sent successfully.") - return + logger.info("Email sent successfully.") except Exception as e: log_exception(e) - return + logger.error(f"Failed to send email: {e}") @shared_task( @@ -247,10 +187,29 @@ def send_webhook_deactivation_email(webhook_id, receiver_id, current_site, reaso retry_jitter=True, ) def webhook_send_task( - self, webhook, slug, event, event_data, action, current_site, activity -): + self, + webhook_id: str, + slug: str, + event: str, + event_data: Optional[Dict[str, Any]], + action: str, + current_site: str, + activity: Optional[Dict[str, Any]], +) -> None: + """ + Send webhook notifications to configured endpoints. + + Args: + webhook (str): Webhook ID + slug (str): Workspace slug + event (str): Event type + event_data (Optional[Dict[str, Any]]): Event data to be sent + action (str): HTTP method/action + current_site (str): Current site URL + activity (Optional[Dict[str, Any]]): Activity data + """ try: - webhook = Webhook.objects.get(id=webhook, workspace__slug=slug) + webhook = Webhook.objects.get(id=webhook_id, workspace__slug=slug) headers = { "Content-Type": "application/json", @@ -297,7 +256,12 @@ def webhook_send_task( ) signature = hmac_signature.hexdigest() headers["X-Plane-Signature"] = signature + except Exception as e: + log_exception(e) + logger.error(f"Failed to send webhook: {e}") + return + try: # Send the webhook event response = requests.post(webhook.url, headers=headers, json=payload, timeout=30) @@ -314,7 +278,7 @@ def webhook_send_task( response_body=str(response.text), retry_count=str(self.request.retries), ) - + logger.info(f"Webhook {webhook.id} sent successfully") except requests.RequestException as e: # Log the failed webhook request WebhookLog.objects.create( @@ -329,12 +293,13 @@ def webhook_send_task( response_body=str(e), retry_count=str(self.request.retries), ) + logger.error(f"Webhook {webhook.id} failed with error: {e}") # Retry logic if self.request.retries >= self.max_retries: Webhook.objects.filter(pk=webhook.id).update(is_active=False) if webhook: # send email for the deactivation of the webhook - send_webhook_deactivation_email( + send_webhook_deactivation_email.delay( webhook_id=webhook.id, receiver_id=webhook.created_by_id, reason=str(e), @@ -344,26 +309,50 @@ def webhook_send_task( raise requests.RequestException() except Exception as e: - if settings.DEBUG: - print(e) log_exception(e) return @shared_task def webhook_activity( - event, - verb, - field, - old_value, - new_value, - actor_id, - slug, - current_site, - event_id, - old_identifier, - new_identifier, -): + event: str, + verb: str, + field: Optional[str], + old_value: Any, + new_value: Any, + actor_id: str | uuid.UUID, + slug: str, + current_site: str, + event_id: str | uuid.UUID, + old_identifier: Optional[str], + new_identifier: Optional[str], +) -> None: + """ + Process and send webhook notifications for various activities in the system. + + This task filters relevant webhooks based on the event type and sends notifications + to all active webhooks for the workspace. + + Args: + event (str): Type of event (project, issue, module, cycle, issue_comment) + verb (str): Action performed (created, updated, deleted) + field (Optional[str]): Name of the field that was changed + old_value (Any): Previous value of the field + new_value (Any): New value of the field + actor_id (str | uuid.UUID): ID of the user who performed the action + slug (str): Workspace slug + current_site (str): Current site URL + event_id (str | uuid.UUID): ID of the event object + old_identifier (Optional[str]): Previous identifier if any + new_identifier (Optional[str]): New identifier if any + + Returns: + None + + Note: + The function silently returns on ObjectDoesNotExist exceptions to handle + race conditions where objects might have been deleted. + """ try: webhooks = Webhook.objects.filter(workspace__slug=slug, is_active=True) @@ -384,7 +373,7 @@ def webhook_activity( for webhook in webhooks: webhook_send_task.delay( - webhook=webhook.id, + webhook_id=webhook.id, slug=slug, event=event, event_data=(