diff --git a/requirements/common.in b/requirements/common.in index 8daafed28e..af5cb3739c 100644 --- a/requirements/common.in +++ b/requirements/common.in @@ -185,3 +185,6 @@ tlds # Unicode Collation Algorithm for sorting multilingual strings pyuca + +# Handle connection retries with exponential backoff +backoff diff --git a/requirements/dev.txt b/requirements/dev.txt index 6b38cef72a..b462254634 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -73,6 +73,10 @@ backcall==0.2.0 \ --hash=sha256:5cbdbf27be5e7cfadb448baf0aa95508f91f2bbc6c6437cd9cd06e2a4c215e1e \ --hash=sha256:fbbce6a29f263178a1f7915c1940bde0ec2b2a967566fe1c65c1dfb7422bd255 # via ipython +backoff==1.10.0 \ + --hash=sha256:5e73e2cbe780e1915a204799dba0a01896f45f4385e636bcca7a0614d879d0cd \ + --hash=sha256:b8fba021fac74055ac05eb7c7bfce4723aedde6cd0a504e5326bcb0bdd6d19a4 + # via -r requirements/common.in beautifulsoup4==4.9.3 \ --hash=sha256:4c98143716ef1cb40bf7f39a8e3eec8f8b009509e74904ba3a7b315431577e35 \ --hash=sha256:84729e322ad1d5b4d25f805bfa05b902dd96450f43842c4e99067d5e1369eb25 \ diff --git a/requirements/prod.txt b/requirements/prod.txt index 87c7501629..1ba4de9ac1 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -45,6 +45,10 @@ backcall==0.2.0 \ --hash=sha256:5cbdbf27be5e7cfadb448baf0aa95508f91f2bbc6c6437cd9cd06e2a4c215e1e \ --hash=sha256:fbbce6a29f263178a1f7915c1940bde0ec2b2a967566fe1c65c1dfb7422bd255 # via ipython +backoff==1.10.0 \ + --hash=sha256:5e73e2cbe780e1915a204799dba0a01896f45f4385e636bcca7a0614d879d0cd \ + --hash=sha256:b8fba021fac74055ac05eb7c7bfce4723aedde6cd0a504e5326bcb0bdd6d19a4 + # via -r requirements/common.in beautifulsoup4==4.9.3 \ --hash=sha256:4c98143716ef1cb40bf7f39a8e3eec8f8b009509e74904ba3a7b315431577e35 \ --hash=sha256:84729e322ad1d5b4d25f805bfa05b902dd96450f43842c4e99067d5e1369eb25 \ diff --git a/version.py b/version.py index a86d5d7ca1..52e282d02a 100644 --- a/version.py +++ b/version.py @@ -45,4 +45,4 @@ API_FEATURE_LEVEL = 57 # historical commits sharing the same major version, in which case a # minor version bump suffices. -PROVISION_VERSION = "141.2" +PROVISION_VERSION = "141.3" diff --git a/zerver/lib/send_email.py b/zerver/lib/send_email.py index 77f0d9ca74..0347f3c838 100644 --- a/zerver/lib/send_email.py +++ b/zerver/lib/send_email.py @@ -8,9 +8,12 @@ from email.policy import default from email.utils import formataddr, parseaddr from typing import Any, Dict, List, Mapping, Optional, Tuple +import backoff import orjson from django.conf import settings -from django.core.mail import EmailMultiAlternatives +from django.core.mail import EmailMultiAlternatives, get_connection +from django.core.mail.backends.base import BaseEmailBackend +from django.core.mail.backends.smtp import EmailBackend from django.core.mail.message import sanitize_address from django.core.management import CommandError from django.db import transaction @@ -25,6 +28,8 @@ from scripts.setup.inline_email_css import inline_template from zerver.lib.logging_util import log_to_file from zerver.models import EMAIL_TYPES, Realm, ScheduledEmail, UserProfile, get_user_profile_by_id +MAX_CONNECTION_TRIES = 3 + ## Logging setup ## logger = logging.getLogger("zulip.send_email") @@ -207,6 +212,7 @@ def send_email( language: Optional[str] = None, context: Dict[str, Any] = {}, realm: Optional[Realm] = None, + connection: Optional[BaseEmailBackend] = None, ) -> None: mail = build_email( template_prefix, @@ -222,11 +228,35 @@ def send_email( template = template_prefix.split("/")[-1] logger.info("Sending %s email to %s", template, mail.to) - if mail.send() == 0: + if connection is None: + connection = get_connection() + # This will call .open() for us, which is a no-op if it's already open; + # it will only call .close() if it was not open to begin with + if connection.send_messages([mail]) == 0: logger.error("Error sending %s email to %s", template, mail.to) raise EmailNotDeliveredException +@backoff.on_exception(backoff.expo, OSError, max_tries=MAX_CONNECTION_TRIES, logger=None) +def initialize_connection(connection: Optional[BaseEmailBackend] = None) -> BaseEmailBackend: + if not connection: + connection = get_connection() + if connection.open(): + # If it's a new connection, no need to no-op to check connectivity + return connection + # No-op to ensure that we don't return a connection that has been closed by the mail server + if isinstance(connection, EmailBackend): + try: + status = connection.connection.noop()[0] + except Exception: + status = -1 + if status != 250: + # Close and connect again. + connection.close() + connection.open() + return connection + + def send_future_email( template_prefix: str, realm: Realm, diff --git a/zerver/lib/test_helpers.py b/zerver/lib/test_helpers.py index 079ee3a23d..15840ca74b 100644 --- a/zerver/lib/test_helpers.py +++ b/zerver/lib/test_helpers.py @@ -690,6 +690,9 @@ def mock_queue_publish( ) -> Iterator[mock.MagicMock]: inner = mock.MagicMock(**kwargs) + # This helper ensures that events published to the queues are + # serializable as JSON; unserializable events would make RabbitMQ + # crash in production. def verify_serialize( queue_name: str, event: Dict[str, object], diff --git a/zerver/tests/test_send_email.py b/zerver/tests/test_send_email.py index 7e4b52d56a..d0cc4cb5d6 100644 --- a/zerver/tests/test_send_email.py +++ b/zerver/tests/test_send_email.py @@ -1,6 +1,11 @@ +import smtplib +from unittest import mock + +from django.core.mail.backends.locmem import EmailBackend +from django.core.mail.backends.smtp import EmailBackend as SMTPBackend from django.core.mail.message import sanitize_address -from zerver.lib.send_email import FromAddress, build_email +from zerver.lib.send_email import FromAddress, build_email, initialize_connection from zerver.lib.test_classes import ZulipTestCase OVERLY_LONG_NAME = "Z̷̧̙̯͙̠͇̰̲̞̙͆́͐̅̌͐̔͑̚u̷̼͎̹̻̻̣̞͈̙͛͑̽̉̾̀̅̌͜͠͞ļ̛̫̻̫̰̪̩̠̣̼̏̅́͌̊͞į̴̛̛̩̜̜͕̘̂̑̀̈p̡̛͈͖͓̟͍̿͒̍̽͐͆͂̀ͅ A̰͉̹̅̽̑̕͜͟͡c̷͚̙̘̦̞̫̭͗̋͋̾̑͆̒͟͞c̵̗̹̣̲͚̳̳̮͋̈́̾̉̂͝ͅo̠̣̻̭̰͐́͛̄̂̿̏͊u̴̱̜̯̭̞̠͋͛͐̍̄n̸̡̘̦͕͓̬͌̂̎͊͐̎͌̕ť̮͎̯͎̣̙̺͚̱̌̀́̔͢͝ S͇̯̯̙̳̝͆̊̀͒͛̕ę̛̘̬̺͎͎́̔̊̀͂̓̆̕͢ͅc̨͎̼̯̩̽͒̀̏̄̌̚u̷͉̗͕̼̮͎̬͓͋̃̀͂̈̂̈͊͛ř̶̡͔̺̱̹͓̺́̃̑̉͡͞ͅi̶̺̭͈̬̞̓̒̃͆̅̿̀̄́t͔̹̪͔̥̣̙̍̍̍̉̑̏͑́̌ͅŷ̧̗͈͚̥̗͚͊͑̀͢͜͡" @@ -57,3 +62,39 @@ class TestBuildEmail(ZulipTestCase): language="en", ) self.assertEqual(mail.extra_headers["From"], FromAddress.NOREPLY) + + +class TestSendEmail(ZulipTestCase): + def test_initialize_connection(self) -> None: + # Test the new connection case + with mock.patch.object(EmailBackend, "open", return_value=True): + backend = initialize_connection(None) + self.assertTrue(isinstance(backend, EmailBackend)) + + backend = mock.MagicMock(spec=SMTPBackend) + backend.connection = mock.MagicMock(spec=smtplib.SMTP) + + self.assertTrue(isinstance(backend, SMTPBackend)) + + # Test the old connection case when it is still open + backend.open.return_value = False + backend.connection.noop.return_value = [250] + initialize_connection(backend) + self.assertEqual(backend.open.call_count, 1) + self.assertEqual(backend.connection.noop.call_count, 1) + + # Test the old connection case when it was closed by the server + backend.connection.noop.return_value = [404] + backend.close.return_value = False + initialize_connection(backend) + # 2 more calls to open, 1 more call to noop and 1 call to close + self.assertEqual(backend.open.call_count, 3) + self.assertEqual(backend.connection.noop.call_count, 2) + self.assertEqual(backend.close.call_count, 1) + + # Test backoff procedure + backend.open.side_effect = OSError + with self.assertRaises(OSError): + initialize_connection(backend) + # 3 more calls to open as we try 3 times before giving up + self.assertEqual(backend.open.call_count, 6) diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 20a4305158..649b44da0b 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -37,6 +37,7 @@ from typing import ( import orjson import sentry_sdk from django.conf import settings +from django.core.mail.backends.smtp import EmailBackend from django.db import connection from django.db.models import F from django.utils.timezone import now as timezone_now @@ -82,6 +83,7 @@ from zerver.lib.send_email import ( EmailNotDeliveredException, FromAddress, handle_send_email_format_changes, + initialize_connection, send_email, send_future_email, ) @@ -169,6 +171,9 @@ def check_and_send_restart_signal() -> None: pass +# If you change the function on which this decorator is used be careful that the new +# function doesn't delete the "failed_tries" attribute of "data" which is needed for +# "retry_event" to work correctly; see EmailSendingWorker for an example with deepcopy. def retry_send_email_failures( func: Callable[[ConcreteQueueWorker, Dict[str, Any]], None], ) -> Callable[[ConcreteQueueWorker, Dict[str, Any]], None]: @@ -609,9 +614,13 @@ class MissedMessageWorker(QueueProcessingWorker): @assign_queue("email_senders") -class EmailSendingWorker(QueueProcessingWorker): +class EmailSendingWorker(LoopQueueProcessingWorker): + def __init__(self) -> None: + super().__init__() + self.connection: EmailBackend = initialize_connection(None) + @retry_send_email_failures - def consume(self, event: Dict[str, Any]) -> None: + def send_email(self, event: Dict[str, Any]) -> None: # Copy the event, so that we don't pass the `failed_tries' # data to send_email (which neither takes that # argument nor needs that data). @@ -619,7 +628,18 @@ class EmailSendingWorker(QueueProcessingWorker): if "failed_tries" in copied_event: del copied_event["failed_tries"] handle_send_email_format_changes(copied_event) - send_email(**copied_event) + self.connection = initialize_connection(self.connection) + send_email(**copied_event, connection=self.connection) + + def consume_batch(self, events: List[Dict[str, Any]]) -> None: + for event in events: + self.send_email(event) + + def stop(self) -> None: + try: + self.connection.close() + finally: + super().stop() @assign_queue("missedmessage_mobile_notifications")