From 9afde790c6a1855de1714378b1ee3e467a7b0fb1 Mon Sep 17 00:00:00 2001 From: Cyril Pletinckx Date: Sat, 20 Mar 2021 14:07:02 +0100 Subject: [PATCH] email: Open a single SMTP connection to send email batches. Previously the outgoing emails were sent over several SMTP connections through the EmailSendingWorker; establishing a new connection each time adds notable overhead. Redefine EmailSendingWorker worker to be a LoopQueueProcessingWorker, which allows it to handle batches of events. At the same time, persist the connection across email sending, if possible. The connection is initialized in the constructor of the worker in order to keep the same connection throughout the whole process. The concrete implementation of the consume_batch function is simply processing each email one at a time until they have all been sent. In order to reuse the previously implemented decorator to retry sending failures a new method that meets the decorator's required arguments is declared inside the EmailSendingWorker class. This allows to retry the sending process of a particular email inside the batch if the caught exception leaves this process retriable. A second retry mechanism is used inside the initialize_connection function to redo the opening of the connection until it works or until three attempts failed. For this purpose the backoff module has been added to the dependencies and a test has been added to ensure that this retry mechanism works well. The connection is closed when the stop method is called. Fixes: #17672. --- requirements/common.in | 3 +++ requirements/dev.txt | 4 +++ requirements/prod.txt | 4 +++ version.py | 2 +- zerver/lib/send_email.py | 34 ++++++++++++++++++++++-- zerver/lib/test_helpers.py | 3 +++ zerver/tests/test_send_email.py | 43 ++++++++++++++++++++++++++++++- zerver/worker/queue_processors.py | 26 ++++++++++++++++--- 8 files changed, 112 insertions(+), 7 deletions(-) diff --git a/requirements/common.in b/requirements/common.in index 8daafed28e..af5cb3739c 100644 --- a/requirements/common.in +++ b/requirements/common.in @@ -185,3 +185,6 @@ tlds # Unicode Collation Algorithm for sorting multilingual strings pyuca + +# Handle connection retries with exponential backoff +backoff diff --git a/requirements/dev.txt b/requirements/dev.txt index 6b38cef72a..b462254634 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -73,6 +73,10 @@ backcall==0.2.0 \ --hash=sha256:5cbdbf27be5e7cfadb448baf0aa95508f91f2bbc6c6437cd9cd06e2a4c215e1e \ --hash=sha256:fbbce6a29f263178a1f7915c1940bde0ec2b2a967566fe1c65c1dfb7422bd255 # via ipython +backoff==1.10.0 \ + --hash=sha256:5e73e2cbe780e1915a204799dba0a01896f45f4385e636bcca7a0614d879d0cd \ + --hash=sha256:b8fba021fac74055ac05eb7c7bfce4723aedde6cd0a504e5326bcb0bdd6d19a4 + # via -r requirements/common.in beautifulsoup4==4.9.3 \ --hash=sha256:4c98143716ef1cb40bf7f39a8e3eec8f8b009509e74904ba3a7b315431577e35 \ --hash=sha256:84729e322ad1d5b4d25f805bfa05b902dd96450f43842c4e99067d5e1369eb25 \ diff --git a/requirements/prod.txt b/requirements/prod.txt index 87c7501629..1ba4de9ac1 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -45,6 +45,10 @@ backcall==0.2.0 \ --hash=sha256:5cbdbf27be5e7cfadb448baf0aa95508f91f2bbc6c6437cd9cd06e2a4c215e1e \ --hash=sha256:fbbce6a29f263178a1f7915c1940bde0ec2b2a967566fe1c65c1dfb7422bd255 # via ipython +backoff==1.10.0 \ + --hash=sha256:5e73e2cbe780e1915a204799dba0a01896f45f4385e636bcca7a0614d879d0cd \ + --hash=sha256:b8fba021fac74055ac05eb7c7bfce4723aedde6cd0a504e5326bcb0bdd6d19a4 + # via -r requirements/common.in beautifulsoup4==4.9.3 \ --hash=sha256:4c98143716ef1cb40bf7f39a8e3eec8f8b009509e74904ba3a7b315431577e35 \ --hash=sha256:84729e322ad1d5b4d25f805bfa05b902dd96450f43842c4e99067d5e1369eb25 \ diff --git a/version.py b/version.py index a86d5d7ca1..52e282d02a 100644 --- a/version.py +++ b/version.py @@ -45,4 +45,4 @@ API_FEATURE_LEVEL = 57 # historical commits sharing the same major version, in which case a # minor version bump suffices. -PROVISION_VERSION = "141.2" +PROVISION_VERSION = "141.3" diff --git a/zerver/lib/send_email.py b/zerver/lib/send_email.py index 77f0d9ca74..0347f3c838 100644 --- a/zerver/lib/send_email.py +++ b/zerver/lib/send_email.py @@ -8,9 +8,12 @@ from email.policy import default from email.utils import formataddr, parseaddr from typing import Any, Dict, List, Mapping, Optional, Tuple +import backoff import orjson from django.conf import settings -from django.core.mail import EmailMultiAlternatives +from django.core.mail import EmailMultiAlternatives, get_connection +from django.core.mail.backends.base import BaseEmailBackend +from django.core.mail.backends.smtp import EmailBackend from django.core.mail.message import sanitize_address from django.core.management import CommandError from django.db import transaction @@ -25,6 +28,8 @@ from scripts.setup.inline_email_css import inline_template from zerver.lib.logging_util import log_to_file from zerver.models import EMAIL_TYPES, Realm, ScheduledEmail, UserProfile, get_user_profile_by_id +MAX_CONNECTION_TRIES = 3 + ## Logging setup ## logger = logging.getLogger("zulip.send_email") @@ -207,6 +212,7 @@ def send_email( language: Optional[str] = None, context: Dict[str, Any] = {}, realm: Optional[Realm] = None, + connection: Optional[BaseEmailBackend] = None, ) -> None: mail = build_email( template_prefix, @@ -222,11 +228,35 @@ def send_email( template = template_prefix.split("/")[-1] logger.info("Sending %s email to %s", template, mail.to) - if mail.send() == 0: + if connection is None: + connection = get_connection() + # This will call .open() for us, which is a no-op if it's already open; + # it will only call .close() if it was not open to begin with + if connection.send_messages([mail]) == 0: logger.error("Error sending %s email to %s", template, mail.to) raise EmailNotDeliveredException +@backoff.on_exception(backoff.expo, OSError, max_tries=MAX_CONNECTION_TRIES, logger=None) +def initialize_connection(connection: Optional[BaseEmailBackend] = None) -> BaseEmailBackend: + if not connection: + connection = get_connection() + if connection.open(): + # If it's a new connection, no need to no-op to check connectivity + return connection + # No-op to ensure that we don't return a connection that has been closed by the mail server + if isinstance(connection, EmailBackend): + try: + status = connection.connection.noop()[0] + except Exception: + status = -1 + if status != 250: + # Close and connect again. + connection.close() + connection.open() + return connection + + def send_future_email( template_prefix: str, realm: Realm, diff --git a/zerver/lib/test_helpers.py b/zerver/lib/test_helpers.py index 079ee3a23d..15840ca74b 100644 --- a/zerver/lib/test_helpers.py +++ b/zerver/lib/test_helpers.py @@ -690,6 +690,9 @@ def mock_queue_publish( ) -> Iterator[mock.MagicMock]: inner = mock.MagicMock(**kwargs) + # This helper ensures that events published to the queues are + # serializable as JSON; unserializable events would make RabbitMQ + # crash in production. def verify_serialize( queue_name: str, event: Dict[str, object], diff --git a/zerver/tests/test_send_email.py b/zerver/tests/test_send_email.py index 7e4b52d56a..d0cc4cb5d6 100644 --- a/zerver/tests/test_send_email.py +++ b/zerver/tests/test_send_email.py @@ -1,6 +1,11 @@ +import smtplib +from unittest import mock + +from django.core.mail.backends.locmem import EmailBackend +from django.core.mail.backends.smtp import EmailBackend as SMTPBackend from django.core.mail.message import sanitize_address -from zerver.lib.send_email import FromAddress, build_email +from zerver.lib.send_email import FromAddress, build_email, initialize_connection from zerver.lib.test_classes import ZulipTestCase OVERLY_LONG_NAME = "Z̷̧̙̯͙̠͇̰̲̞̙͆́͐̅̌͐̔͑̚u̷̼͎̹̻̻̣̞͈̙͛͑̽̉̾̀̅̌͜͠͞ļ̛̫̻̫̰̪̩̠̣̼̏̅́͌̊͞į̴̛̛̩̜̜͕̘̂̑̀̈p̡̛͈͖͓̟͍̿͒̍̽͐͆͂̀ͅ A̰͉̹̅̽̑̕͜͟͡c̷͚̙̘̦̞̫̭͗̋͋̾̑͆̒͟͞c̵̗̹̣̲͚̳̳̮͋̈́̾̉̂͝ͅo̠̣̻̭̰͐́͛̄̂̿̏͊u̴̱̜̯̭̞̠͋͛͐̍̄n̸̡̘̦͕͓̬͌̂̎͊͐̎͌̕ť̮͎̯͎̣̙̺͚̱̌̀́̔͢͝ S͇̯̯̙̳̝͆̊̀͒͛̕ę̛̘̬̺͎͎́̔̊̀͂̓̆̕͢ͅc̨͎̼̯̩̽͒̀̏̄̌̚u̷͉̗͕̼̮͎̬͓͋̃̀͂̈̂̈͊͛ř̶̡͔̺̱̹͓̺́̃̑̉͡͞ͅi̶̺̭͈̬̞̓̒̃͆̅̿̀̄́t͔̹̪͔̥̣̙̍̍̍̉̑̏͑́̌ͅŷ̧̗͈͚̥̗͚͊͑̀͢͜͡" @@ -57,3 +62,39 @@ class TestBuildEmail(ZulipTestCase): language="en", ) self.assertEqual(mail.extra_headers["From"], FromAddress.NOREPLY) + + +class TestSendEmail(ZulipTestCase): + def test_initialize_connection(self) -> None: + # Test the new connection case + with mock.patch.object(EmailBackend, "open", return_value=True): + backend = initialize_connection(None) + self.assertTrue(isinstance(backend, EmailBackend)) + + backend = mock.MagicMock(spec=SMTPBackend) + backend.connection = mock.MagicMock(spec=smtplib.SMTP) + + self.assertTrue(isinstance(backend, SMTPBackend)) + + # Test the old connection case when it is still open + backend.open.return_value = False + backend.connection.noop.return_value = [250] + initialize_connection(backend) + self.assertEqual(backend.open.call_count, 1) + self.assertEqual(backend.connection.noop.call_count, 1) + + # Test the old connection case when it was closed by the server + backend.connection.noop.return_value = [404] + backend.close.return_value = False + initialize_connection(backend) + # 2 more calls to open, 1 more call to noop and 1 call to close + self.assertEqual(backend.open.call_count, 3) + self.assertEqual(backend.connection.noop.call_count, 2) + self.assertEqual(backend.close.call_count, 1) + + # Test backoff procedure + backend.open.side_effect = OSError + with self.assertRaises(OSError): + initialize_connection(backend) + # 3 more calls to open as we try 3 times before giving up + self.assertEqual(backend.open.call_count, 6) diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 20a4305158..649b44da0b 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -37,6 +37,7 @@ from typing import ( import orjson import sentry_sdk from django.conf import settings +from django.core.mail.backends.smtp import EmailBackend from django.db import connection from django.db.models import F from django.utils.timezone import now as timezone_now @@ -82,6 +83,7 @@ from zerver.lib.send_email import ( EmailNotDeliveredException, FromAddress, handle_send_email_format_changes, + initialize_connection, send_email, send_future_email, ) @@ -169,6 +171,9 @@ def check_and_send_restart_signal() -> None: pass +# If you change the function on which this decorator is used be careful that the new +# function doesn't delete the "failed_tries" attribute of "data" which is needed for +# "retry_event" to work correctly; see EmailSendingWorker for an example with deepcopy. def retry_send_email_failures( func: Callable[[ConcreteQueueWorker, Dict[str, Any]], None], ) -> Callable[[ConcreteQueueWorker, Dict[str, Any]], None]: @@ -609,9 +614,13 @@ class MissedMessageWorker(QueueProcessingWorker): @assign_queue("email_senders") -class EmailSendingWorker(QueueProcessingWorker): +class EmailSendingWorker(LoopQueueProcessingWorker): + def __init__(self) -> None: + super().__init__() + self.connection: EmailBackend = initialize_connection(None) + @retry_send_email_failures - def consume(self, event: Dict[str, Any]) -> None: + def send_email(self, event: Dict[str, Any]) -> None: # Copy the event, so that we don't pass the `failed_tries' # data to send_email (which neither takes that # argument nor needs that data). @@ -619,7 +628,18 @@ class EmailSendingWorker(QueueProcessingWorker): if "failed_tries" in copied_event: del copied_event["failed_tries"] handle_send_email_format_changes(copied_event) - send_email(**copied_event) + self.connection = initialize_connection(self.connection) + send_email(**copied_event, connection=self.connection) + + def consume_batch(self, events: List[Dict[str, Any]]) -> None: + for event in events: + self.send_email(event) + + def stop(self) -> None: + try: + self.connection.close() + finally: + super().stop() @assign_queue("missedmessage_mobile_notifications")