[WEB-3926] chore: removed the duplicated webhook task and updated the webhook task to handle exceptions correctly (#6951)

* chore: removed the duplicated webhook function

* chore: update webhook send task to handle errors

---------

Co-authored-by: pablohashescobar <nikhilschacko@gmail.com>
This commit is contained in:
Bavisetti Narayan 2025-04-29 14:04:00 +05:30 committed by GitHub
parent 298e3dc9ca
commit baabb82669
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 146 additions and 191 deletions

View file

@ -1650,40 +1650,6 @@ def issue_activity(
# Save all the values to database
issue_activities_created = IssueActivity.objects.bulk_create(issue_activities)
# Post the updates to segway for integrations and webhooks
if len(issue_activities_created):
for activity in issue_activities_created:
webhook_activity.delay(
event=(
"issue_comment"
if activity.field == "comment"
else "intake_issue"
if intake
else "issue"
),
event_id=(
activity.issue_comment_id
if activity.field == "comment"
else intake
if intake
else activity.issue_id
),
verb=activity.verb,
field=(
"description" if activity.field == "comment" else activity.field
),
old_value=(
activity.old_value if activity.old_value != "" else None
),
new_value=(
activity.new_value if activity.new_value != "" else None
),
actor_id=activity.actor_id,
current_site=origin,
slug=activity.workspace.slug,
old_identifier=activity.old_identifier,
new_identifier=activity.new_identifier,
)
if notification:
notifications.delay(

View file

@ -5,6 +5,7 @@ import logging
import uuid
import requests
from typing import Any, Dict, List, Optional, Union
# Third party imports
from celery import shared_task
@ -70,150 +71,89 @@ MODEL_MAPPER = {
}
def get_model_data(event, event_id, many=False):
logger = logging.getLogger("plane.worker")
def get_model_data(
event: str, event_id: Union[str, List[str]], many: bool = False
) -> Dict[str, Any]:
"""
Retrieve and serialize model data based on the event type.
Args:
event (str): The type of event/model to retrieve data for
event_id (Union[str, List[str]]): The ID or list of IDs of the model instance(s)
many (bool): Whether to retrieve multiple instances
Returns:
Dict[str, Any]: Serialized model data
Raises:
ValueError: If serializer is not found for the event
ObjectDoesNotExist: If model instance is not found
"""
model = MODEL_MAPPER.get(event)
if many:
queryset = model.objects.filter(pk__in=event_id)
else:
queryset = model.objects.get(pk=event_id)
serializer = SERIALIZER_MAPPER.get(event)
return serializer(queryset, many=many).data
if model is None:
raise ValueError(f"Model not found for event: {event}")
@shared_task(
bind=True,
autoretry_for=(requests.RequestException,),
retry_backoff=600,
max_retries=5,
retry_jitter=True,
)
def webhook_task(self, webhook, slug, event, event_data, action, current_site):
try:
webhook = Webhook.objects.get(id=webhook, workspace__slug=slug)
if many:
queryset = model.objects.filter(pk__in=event_id)
else:
queryset = model.objects.get(pk=event_id)
headers = {
"Content-Type": "application/json",
"User-Agent": "Autopilot",
"X-Plane-Delivery": str(uuid.uuid4()),
"X-Plane-Event": event,
}
serializer = SERIALIZER_MAPPER.get(event)
if serializer is None:
raise ValueError(f"Serializer not found for event: {event}")
# # Your secret key
event_data = (
json.loads(json.dumps(event_data, cls=DjangoJSONEncoder))
if event_data is not None
else None
)
action = {
"POST": "create",
"PATCH": "update",
"PUT": "update",
"DELETE": "delete",
}.get(action, action)
payload = {
"event": event,
"action": action,
"webhook_id": str(webhook.id),
"workspace_id": str(webhook.workspace_id),
"data": event_data,
}
# Use HMAC for generating signature
if webhook.secret_key:
hmac_signature = hmac.new(
webhook.secret_key.encode("utf-8"),
json.dumps(payload).encode("utf-8"),
hashlib.sha256,
)
signature = hmac_signature.hexdigest()
headers["X-Plane-Signature"] = signature
# Send the webhook event
response = requests.post(webhook.url, headers=headers, json=payload, timeout=30)
# Log the webhook request
WebhookLog.objects.create(
workspace_id=str(webhook.workspace_id),
webhook=str(webhook.id),
event_type=str(event),
request_method=str(action),
request_headers=str(headers),
request_body=str(payload),
response_status=str(response.status_code),
response_headers=str(response.headers),
response_body=str(response.text),
retry_count=str(self.request.retries),
)
except Webhook.DoesNotExist:
return
except requests.RequestException as e:
# Log the failed webhook request
WebhookLog.objects.create(
workspace_id=str(webhook.workspace_id),
webhook=str(webhook.id),
event_type=str(event),
request_method=str(action),
request_headers=str(headers),
request_body=str(payload),
response_status=500,
response_headers="",
response_body=str(e),
retry_count=str(self.request.retries),
)
# Retry logic
if self.request.retries >= self.max_retries:
Webhook.objects.filter(pk=webhook.id).update(is_active=False)
if webhook:
# send email for the deactivation of the webhook
send_webhook_deactivation_email(
webhook_id=webhook.id,
receiver_id=webhook.created_by_id,
reason=str(e),
current_site=current_site,
)
return
raise requests.RequestException()
except Exception as e:
if settings.DEBUG:
print(e)
log_exception(e)
return
return serializer(queryset, many=many).data
except ObjectDoesNotExist:
raise ObjectDoesNotExist(f"No {event} found with id: {event_id}")
@shared_task
def send_webhook_deactivation_email(webhook_id, receiver_id, current_site, reason):
# Get email configurations
(
EMAIL_HOST,
EMAIL_HOST_USER,
EMAIL_HOST_PASSWORD,
EMAIL_PORT,
EMAIL_USE_TLS,
EMAIL_USE_SSL,
EMAIL_FROM,
) = get_email_configuration()
receiver = User.objects.get(pk=receiver_id)
webhook = Webhook.objects.get(pk=webhook_id)
subject = "Webhook Deactivated"
message = f"Webhook {webhook.url} has been deactivated due to failed requests."
# Send the mail
context = {
"email": receiver.email,
"message": message,
"webhook_url": f"{current_site}/{str(webhook.workspace.slug)}/settings/webhooks/{str(webhook.id)}",
}
html_content = render_to_string(
"emails/notifications/webhook-deactivate.html", context
)
text_content = strip_tags(html_content)
def send_webhook_deactivation_email(
webhook_id: str, receiver_id: str, current_site: str, reason: str
) -> None:
"""
Send an email notification when a webhook is deactivated.
Args:
webhook_id (str): ID of the deactivated webhook
receiver_id (str): ID of the user to receive the notification
current_site (str): Current site URL
reason (str): Reason for webhook deactivation
"""
try:
(
EMAIL_HOST,
EMAIL_HOST_USER,
EMAIL_HOST_PASSWORD,
EMAIL_PORT,
EMAIL_USE_TLS,
EMAIL_USE_SSL,
EMAIL_FROM,
) = get_email_configuration()
receiver = User.objects.get(pk=receiver_id)
webhook = Webhook.objects.get(pk=webhook_id)
# Get the webhook payload
subject = "Webhook Deactivated"
message = f"Webhook {webhook.url} has been deactivated due to failed requests."
# Send the mail
context = {
"email": receiver.email,
"message": message,
"webhook_url": f"{current_site}/{str(webhook.workspace.slug)}/settings/webhooks/{str(webhook.id)}",
}
html_content = render_to_string(
"emails/notifications/webhook-deactivate.html", context
)
text_content = strip_tags(html_content)
# Set the email connection
connection = get_connection(
host=EMAIL_HOST,
port=int(EMAIL_PORT),
@ -223,6 +163,7 @@ def send_webhook_deactivation_email(webhook_id, receiver_id, current_site, reaso
use_ssl=EMAIL_USE_SSL == "1",
)
# Create the email message
msg = EmailMultiAlternatives(
subject=subject,
body=text_content,
@ -232,11 +173,10 @@ def send_webhook_deactivation_email(webhook_id, receiver_id, current_site, reaso
)
msg.attach_alternative(html_content, "text/html")
msg.send()
logging.getLogger("plane").info("Email sent successfully.")
return
logger.info("Email sent successfully.")
except Exception as e:
log_exception(e)
return
logger.error(f"Failed to send email: {e}")
@shared_task(
@ -247,10 +187,29 @@ def send_webhook_deactivation_email(webhook_id, receiver_id, current_site, reaso
retry_jitter=True,
)
def webhook_send_task(
self, webhook, slug, event, event_data, action, current_site, activity
):
self,
webhook_id: str,
slug: str,
event: str,
event_data: Optional[Dict[str, Any]],
action: str,
current_site: str,
activity: Optional[Dict[str, Any]],
) -> None:
"""
Send webhook notifications to configured endpoints.
Args:
webhook (str): Webhook ID
slug (str): Workspace slug
event (str): Event type
event_data (Optional[Dict[str, Any]]): Event data to be sent
action (str): HTTP method/action
current_site (str): Current site URL
activity (Optional[Dict[str, Any]]): Activity data
"""
try:
webhook = Webhook.objects.get(id=webhook, workspace__slug=slug)
webhook = Webhook.objects.get(id=webhook_id, workspace__slug=slug)
headers = {
"Content-Type": "application/json",
@ -297,7 +256,12 @@ def webhook_send_task(
)
signature = hmac_signature.hexdigest()
headers["X-Plane-Signature"] = signature
except Exception as e:
log_exception(e)
logger.error(f"Failed to send webhook: {e}")
return
try:
# Send the webhook event
response = requests.post(webhook.url, headers=headers, json=payload, timeout=30)
@ -314,7 +278,7 @@ def webhook_send_task(
response_body=str(response.text),
retry_count=str(self.request.retries),
)
logger.info(f"Webhook {webhook.id} sent successfully")
except requests.RequestException as e:
# Log the failed webhook request
WebhookLog.objects.create(
@ -329,12 +293,13 @@ def webhook_send_task(
response_body=str(e),
retry_count=str(self.request.retries),
)
logger.error(f"Webhook {webhook.id} failed with error: {e}")
# Retry logic
if self.request.retries >= self.max_retries:
Webhook.objects.filter(pk=webhook.id).update(is_active=False)
if webhook:
# send email for the deactivation of the webhook
send_webhook_deactivation_email(
send_webhook_deactivation_email.delay(
webhook_id=webhook.id,
receiver_id=webhook.created_by_id,
reason=str(e),
@ -344,26 +309,50 @@ def webhook_send_task(
raise requests.RequestException()
except Exception as e:
if settings.DEBUG:
print(e)
log_exception(e)
return
@shared_task
def webhook_activity(
event,
verb,
field,
old_value,
new_value,
actor_id,
slug,
current_site,
event_id,
old_identifier,
new_identifier,
):
event: str,
verb: str,
field: Optional[str],
old_value: Any,
new_value: Any,
actor_id: str | uuid.UUID,
slug: str,
current_site: str,
event_id: str | uuid.UUID,
old_identifier: Optional[str],
new_identifier: Optional[str],
) -> None:
"""
Process and send webhook notifications for various activities in the system.
This task filters relevant webhooks based on the event type and sends notifications
to all active webhooks for the workspace.
Args:
event (str): Type of event (project, issue, module, cycle, issue_comment)
verb (str): Action performed (created, updated, deleted)
field (Optional[str]): Name of the field that was changed
old_value (Any): Previous value of the field
new_value (Any): New value of the field
actor_id (str | uuid.UUID): ID of the user who performed the action
slug (str): Workspace slug
current_site (str): Current site URL
event_id (str | uuid.UUID): ID of the event object
old_identifier (Optional[str]): Previous identifier if any
new_identifier (Optional[str]): New identifier if any
Returns:
None
Note:
The function silently returns on ObjectDoesNotExist exceptions to handle
race conditions where objects might have been deleted.
"""
try:
webhooks = Webhook.objects.filter(workspace__slug=slug, is_active=True)
@ -384,7 +373,7 @@ def webhook_activity(
for webhook in webhooks:
webhook_send_task.delay(
webhook=webhook.id,
webhook_id=webhook.id,
slug=slug,
event=event,
event_data=(