email: Open a single SMTP connection to send email batches.

Previously the outgoing emails were sent over several SMTP
connections through the EmailSendingWorker; establishing a new
connection each time adds notable overhead.

Redefine EmailSendingWorker worker to be a LoopQueueProcessingWorker,
which allows it to handle batches of events. At the same time, persist
the connection across email sending, if possible.

The connection is initialized in the constructor of the worker
in order to keep the same connection throughout the whole process.
The concrete implementation of the consume_batch function is simply
processing each email one at a time until they have all been sent.

In order to reuse the previously implemented decorator to retry
sending failures a new method that meets the decorator's required
arguments is declared inside the EmailSendingWorker class. This
allows to retry the sending process of a particular email inside
the batch if the caught exception leaves this process retriable.

A second retry mechanism is used inside the initialize_connection
function to redo the opening of the connection until it works or
until three attempts failed. For this purpose the backoff module
has been added to the dependencies and a test has been added to
ensure that this retry mechanism works well.

The connection is closed when the stop method is called.

Fixes: #17672.
This commit is contained in:
Cyril Pletinckx 2021-03-20 14:07:02 +01:00 committed by Alex Vandiver
parent 0ad17925eb
commit 9afde790c6
8 changed files with 112 additions and 7 deletions

View File

@ -185,3 +185,6 @@ tlds
# Unicode Collation Algorithm for sorting multilingual strings # Unicode Collation Algorithm for sorting multilingual strings
pyuca pyuca
# Handle connection retries with exponential backoff
backoff

View File

@ -73,6 +73,10 @@ backcall==0.2.0 \
--hash=sha256:5cbdbf27be5e7cfadb448baf0aa95508f91f2bbc6c6437cd9cd06e2a4c215e1e \ --hash=sha256:5cbdbf27be5e7cfadb448baf0aa95508f91f2bbc6c6437cd9cd06e2a4c215e1e \
--hash=sha256:fbbce6a29f263178a1f7915c1940bde0ec2b2a967566fe1c65c1dfb7422bd255 --hash=sha256:fbbce6a29f263178a1f7915c1940bde0ec2b2a967566fe1c65c1dfb7422bd255
# via ipython # via ipython
backoff==1.10.0 \
--hash=sha256:5e73e2cbe780e1915a204799dba0a01896f45f4385e636bcca7a0614d879d0cd \
--hash=sha256:b8fba021fac74055ac05eb7c7bfce4723aedde6cd0a504e5326bcb0bdd6d19a4
# via -r requirements/common.in
beautifulsoup4==4.9.3 \ beautifulsoup4==4.9.3 \
--hash=sha256:4c98143716ef1cb40bf7f39a8e3eec8f8b009509e74904ba3a7b315431577e35 \ --hash=sha256:4c98143716ef1cb40bf7f39a8e3eec8f8b009509e74904ba3a7b315431577e35 \
--hash=sha256:84729e322ad1d5b4d25f805bfa05b902dd96450f43842c4e99067d5e1369eb25 \ --hash=sha256:84729e322ad1d5b4d25f805bfa05b902dd96450f43842c4e99067d5e1369eb25 \

View File

@ -45,6 +45,10 @@ backcall==0.2.0 \
--hash=sha256:5cbdbf27be5e7cfadb448baf0aa95508f91f2bbc6c6437cd9cd06e2a4c215e1e \ --hash=sha256:5cbdbf27be5e7cfadb448baf0aa95508f91f2bbc6c6437cd9cd06e2a4c215e1e \
--hash=sha256:fbbce6a29f263178a1f7915c1940bde0ec2b2a967566fe1c65c1dfb7422bd255 --hash=sha256:fbbce6a29f263178a1f7915c1940bde0ec2b2a967566fe1c65c1dfb7422bd255
# via ipython # via ipython
backoff==1.10.0 \
--hash=sha256:5e73e2cbe780e1915a204799dba0a01896f45f4385e636bcca7a0614d879d0cd \
--hash=sha256:b8fba021fac74055ac05eb7c7bfce4723aedde6cd0a504e5326bcb0bdd6d19a4
# via -r requirements/common.in
beautifulsoup4==4.9.3 \ beautifulsoup4==4.9.3 \
--hash=sha256:4c98143716ef1cb40bf7f39a8e3eec8f8b009509e74904ba3a7b315431577e35 \ --hash=sha256:4c98143716ef1cb40bf7f39a8e3eec8f8b009509e74904ba3a7b315431577e35 \
--hash=sha256:84729e322ad1d5b4d25f805bfa05b902dd96450f43842c4e99067d5e1369eb25 \ --hash=sha256:84729e322ad1d5b4d25f805bfa05b902dd96450f43842c4e99067d5e1369eb25 \

View File

@ -45,4 +45,4 @@ API_FEATURE_LEVEL = 57
# historical commits sharing the same major version, in which case a # historical commits sharing the same major version, in which case a
# minor version bump suffices. # minor version bump suffices.
PROVISION_VERSION = "141.2" PROVISION_VERSION = "141.3"

View File

@ -8,9 +8,12 @@ from email.policy import default
from email.utils import formataddr, parseaddr from email.utils import formataddr, parseaddr
from typing import Any, Dict, List, Mapping, Optional, Tuple from typing import Any, Dict, List, Mapping, Optional, Tuple
import backoff
import orjson import orjson
from django.conf import settings from django.conf import settings
from django.core.mail import EmailMultiAlternatives from django.core.mail import EmailMultiAlternatives, get_connection
from django.core.mail.backends.base import BaseEmailBackend
from django.core.mail.backends.smtp import EmailBackend
from django.core.mail.message import sanitize_address from django.core.mail.message import sanitize_address
from django.core.management import CommandError from django.core.management import CommandError
from django.db import transaction from django.db import transaction
@ -25,6 +28,8 @@ from scripts.setup.inline_email_css import inline_template
from zerver.lib.logging_util import log_to_file from zerver.lib.logging_util import log_to_file
from zerver.models import EMAIL_TYPES, Realm, ScheduledEmail, UserProfile, get_user_profile_by_id from zerver.models import EMAIL_TYPES, Realm, ScheduledEmail, UserProfile, get_user_profile_by_id
MAX_CONNECTION_TRIES = 3
## Logging setup ## ## Logging setup ##
logger = logging.getLogger("zulip.send_email") logger = logging.getLogger("zulip.send_email")
@ -207,6 +212,7 @@ def send_email(
language: Optional[str] = None, language: Optional[str] = None,
context: Dict[str, Any] = {}, context: Dict[str, Any] = {},
realm: Optional[Realm] = None, realm: Optional[Realm] = None,
connection: Optional[BaseEmailBackend] = None,
) -> None: ) -> None:
mail = build_email( mail = build_email(
template_prefix, template_prefix,
@ -222,11 +228,35 @@ def send_email(
template = template_prefix.split("/")[-1] template = template_prefix.split("/")[-1]
logger.info("Sending %s email to %s", template, mail.to) logger.info("Sending %s email to %s", template, mail.to)
if mail.send() == 0: if connection is None:
connection = get_connection()
# This will call .open() for us, which is a no-op if it's already open;
# it will only call .close() if it was not open to begin with
if connection.send_messages([mail]) == 0:
logger.error("Error sending %s email to %s", template, mail.to) logger.error("Error sending %s email to %s", template, mail.to)
raise EmailNotDeliveredException raise EmailNotDeliveredException
@backoff.on_exception(backoff.expo, OSError, max_tries=MAX_CONNECTION_TRIES, logger=None)
def initialize_connection(connection: Optional[BaseEmailBackend] = None) -> BaseEmailBackend:
if not connection:
connection = get_connection()
if connection.open():
# If it's a new connection, no need to no-op to check connectivity
return connection
# No-op to ensure that we don't return a connection that has been closed by the mail server
if isinstance(connection, EmailBackend):
try:
status = connection.connection.noop()[0]
except Exception:
status = -1
if status != 250:
# Close and connect again.
connection.close()
connection.open()
return connection
def send_future_email( def send_future_email(
template_prefix: str, template_prefix: str,
realm: Realm, realm: Realm,

View File

@ -690,6 +690,9 @@ def mock_queue_publish(
) -> Iterator[mock.MagicMock]: ) -> Iterator[mock.MagicMock]:
inner = mock.MagicMock(**kwargs) inner = mock.MagicMock(**kwargs)
# This helper ensures that events published to the queues are
# serializable as JSON; unserializable events would make RabbitMQ
# crash in production.
def verify_serialize( def verify_serialize(
queue_name: str, queue_name: str,
event: Dict[str, object], event: Dict[str, object],

View File

@ -1,6 +1,11 @@
import smtplib
from unittest import mock
from django.core.mail.backends.locmem import EmailBackend
from django.core.mail.backends.smtp import EmailBackend as SMTPBackend
from django.core.mail.message import sanitize_address from django.core.mail.message import sanitize_address
from zerver.lib.send_email import FromAddress, build_email from zerver.lib.send_email import FromAddress, build_email, initialize_connection
from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_classes import ZulipTestCase
OVERLY_LONG_NAME = "Z̷̧̙̯͙̠͇̰̲̞̙͆́͐̅̌͐̔͑̚u̷̼͎̹̻̻̣̞͈̙͛͑̽̉̾̀̅̌͜͠͞ļ̛̫̻̫̰̪̩̠̣̼̏̅́͌̊͞į̴̛̛̩̜̜͕̘̂̑̀̈p̡̛͈͖͓̟͍̿͒̍̽͐͆͂̀ͅ A̰͉̹̅̽̑̕͜͟͡c̷͚̙̘̦̞̫̭͗̋͋̾̑͆̒͟͞c̵̗̹̣̲͚̳̳̮͋̈́̾̉̂͝ͅo̠̣̻̭̰͐́͛̄̂̿̏͊u̴̱̜̯̭̞̠͋͛͐̍̄n̸̡̘̦͕͓̬͌̂̎͊͐̎͌̕ť̮͎̯͎̣̙̺͚̱̌̀́̔͢͝ S͇̯̯̙̳̝͆̊̀͒͛̕ę̛̘̬̺͎͎́̔̊̀͂̓̆̕͢ͅc̨͎̼̯̩̽͒̀̏̄̌̚u̷͉̗͕̼̮͎̬͓͋̃̀͂̈̂̈͊͛ř̶̡͔̺̱̹͓̺́̃̑̉͡͞ͅi̶̺̭͈̬̞̓̒̃͆̅̿̀̄́t͔̹̪͔̥̣̙̍̍̍̉̑̏͑́̌ͅŷ̧̗͈͚̥̗͚͊͑̀͢͜͡" OVERLY_LONG_NAME = "Z̷̧̙̯͙̠͇̰̲̞̙͆́͐̅̌͐̔͑̚u̷̼͎̹̻̻̣̞͈̙͛͑̽̉̾̀̅̌͜͠͞ļ̛̫̻̫̰̪̩̠̣̼̏̅́͌̊͞į̴̛̛̩̜̜͕̘̂̑̀̈p̡̛͈͖͓̟͍̿͒̍̽͐͆͂̀ͅ A̰͉̹̅̽̑̕͜͟͡c̷͚̙̘̦̞̫̭͗̋͋̾̑͆̒͟͞c̵̗̹̣̲͚̳̳̮͋̈́̾̉̂͝ͅo̠̣̻̭̰͐́͛̄̂̿̏͊u̴̱̜̯̭̞̠͋͛͐̍̄n̸̡̘̦͕͓̬͌̂̎͊͐̎͌̕ť̮͎̯͎̣̙̺͚̱̌̀́̔͢͝ S͇̯̯̙̳̝͆̊̀͒͛̕ę̛̘̬̺͎͎́̔̊̀͂̓̆̕͢ͅc̨͎̼̯̩̽͒̀̏̄̌̚u̷͉̗͕̼̮͎̬͓͋̃̀͂̈̂̈͊͛ř̶̡͔̺̱̹͓̺́̃̑̉͡͞ͅi̶̺̭͈̬̞̓̒̃͆̅̿̀̄́t͔̹̪͔̥̣̙̍̍̍̉̑̏͑́̌ͅŷ̧̗͈͚̥̗͚͊͑̀͢͜͡"
@ -57,3 +62,39 @@ class TestBuildEmail(ZulipTestCase):
language="en", language="en",
) )
self.assertEqual(mail.extra_headers["From"], FromAddress.NOREPLY) self.assertEqual(mail.extra_headers["From"], FromAddress.NOREPLY)
class TestSendEmail(ZulipTestCase):
def test_initialize_connection(self) -> None:
# Test the new connection case
with mock.patch.object(EmailBackend, "open", return_value=True):
backend = initialize_connection(None)
self.assertTrue(isinstance(backend, EmailBackend))
backend = mock.MagicMock(spec=SMTPBackend)
backend.connection = mock.MagicMock(spec=smtplib.SMTP)
self.assertTrue(isinstance(backend, SMTPBackend))
# Test the old connection case when it is still open
backend.open.return_value = False
backend.connection.noop.return_value = [250]
initialize_connection(backend)
self.assertEqual(backend.open.call_count, 1)
self.assertEqual(backend.connection.noop.call_count, 1)
# Test the old connection case when it was closed by the server
backend.connection.noop.return_value = [404]
backend.close.return_value = False
initialize_connection(backend)
# 2 more calls to open, 1 more call to noop and 1 call to close
self.assertEqual(backend.open.call_count, 3)
self.assertEqual(backend.connection.noop.call_count, 2)
self.assertEqual(backend.close.call_count, 1)
# Test backoff procedure
backend.open.side_effect = OSError
with self.assertRaises(OSError):
initialize_connection(backend)
# 3 more calls to open as we try 3 times before giving up
self.assertEqual(backend.open.call_count, 6)

View File

@ -37,6 +37,7 @@ from typing import (
import orjson import orjson
import sentry_sdk import sentry_sdk
from django.conf import settings from django.conf import settings
from django.core.mail.backends.smtp import EmailBackend
from django.db import connection from django.db import connection
from django.db.models import F from django.db.models import F
from django.utils.timezone import now as timezone_now from django.utils.timezone import now as timezone_now
@ -82,6 +83,7 @@ from zerver.lib.send_email import (
EmailNotDeliveredException, EmailNotDeliveredException,
FromAddress, FromAddress,
handle_send_email_format_changes, handle_send_email_format_changes,
initialize_connection,
send_email, send_email,
send_future_email, send_future_email,
) )
@ -169,6 +171,9 @@ def check_and_send_restart_signal() -> None:
pass pass
# If you change the function on which this decorator is used be careful that the new
# function doesn't delete the "failed_tries" attribute of "data" which is needed for
# "retry_event" to work correctly; see EmailSendingWorker for an example with deepcopy.
def retry_send_email_failures( def retry_send_email_failures(
func: Callable[[ConcreteQueueWorker, Dict[str, Any]], None], func: Callable[[ConcreteQueueWorker, Dict[str, Any]], None],
) -> Callable[[ConcreteQueueWorker, Dict[str, Any]], None]: ) -> Callable[[ConcreteQueueWorker, Dict[str, Any]], None]:
@ -609,9 +614,13 @@ class MissedMessageWorker(QueueProcessingWorker):
@assign_queue("email_senders") @assign_queue("email_senders")
class EmailSendingWorker(QueueProcessingWorker): class EmailSendingWorker(LoopQueueProcessingWorker):
def __init__(self) -> None:
super().__init__()
self.connection: EmailBackend = initialize_connection(None)
@retry_send_email_failures @retry_send_email_failures
def consume(self, event: Dict[str, Any]) -> None: def send_email(self, event: Dict[str, Any]) -> None:
# Copy the event, so that we don't pass the `failed_tries' # Copy the event, so that we don't pass the `failed_tries'
# data to send_email (which neither takes that # data to send_email (which neither takes that
# argument nor needs that data). # argument nor needs that data).
@ -619,7 +628,18 @@ class EmailSendingWorker(QueueProcessingWorker):
if "failed_tries" in copied_event: if "failed_tries" in copied_event:
del copied_event["failed_tries"] del copied_event["failed_tries"]
handle_send_email_format_changes(copied_event) handle_send_email_format_changes(copied_event)
send_email(**copied_event) self.connection = initialize_connection(self.connection)
send_email(**copied_event, connection=self.connection)
def consume_batch(self, events: List[Dict[str, Any]]) -> None:
for event in events:
self.send_email(event)
def stop(self) -> None:
try:
self.connection.close()
finally:
super().stop()
@assign_queue("missedmessage_mobile_notifications") @assign_queue("missedmessage_mobile_notifications")