missed-message: Lock ScheduledMessageNotificationEmail rows.

This prevents the rows from being deleted out from under the worker
while it is sending emails.
This commit is contained in:
Alex Vandiver 2023-07-12 17:30:14 +00:00 committed by Tim Abbott
parent d87895a3ef
commit be960f4142
1 changed files with 2 additions and 1 deletions

View File

@ -590,6 +590,7 @@ class MissedMessageWorker(QueueProcessingWorker):
logging.debug("Processing missedmessage_emails event: %s", event) logging.debug("Processing missedmessage_emails event: %s", event)
# When we consume an event, check if there are existing pending emails # When we consume an event, check if there are existing pending emails
# for that user, and if so use the same scheduled timestamp. # for that user, and if so use the same scheduled timestamp.
user_profile_id: int = event["user_profile_id"] user_profile_id: int = event["user_profile_id"]
user_profile = get_user_profile_by_id(user_profile_id) user_profile = get_user_profile_by_id(user_profile_id)
batch_duration_seconds = user_profile.email_notifications_batching_period_seconds batch_duration_seconds = user_profile.email_notifications_batching_period_seconds
@ -714,7 +715,7 @@ class MissedMessageWorker(QueueProcessingWorker):
with transaction.atomic(): with transaction.atomic():
events_to_process = ScheduledMessageNotificationEmail.objects.filter( events_to_process = ScheduledMessageNotificationEmail.objects.filter(
scheduled_timestamp__lte=current_time scheduled_timestamp__lte=current_time
) ).select_for_update()
# Batch the entries by user # Batch the entries by user
events_by_recipient: Dict[int, Dict[int, MissedMessageData]] = defaultdict(dict) events_by_recipient: Dict[int, Dict[int, MissedMessageData]] = defaultdict(dict)