mirror of https://github.com/zulip/zulip.git
172 lines
7.0 KiB
Python
172 lines
7.0 KiB
Python
from smtplib import SMTP, SMTPDataError, SMTPException, SMTPRecipientsRefused
|
|
from unittest import mock
|
|
|
|
from django.core.mail.backends.locmem import EmailBackend
|
|
from django.core.mail.backends.smtp import EmailBackend as SMTPBackend
|
|
from django.core.mail.message import sanitize_address
|
|
|
|
from zerver.lib.send_email import (
|
|
EmailNotDeliveredException,
|
|
FromAddress,
|
|
build_email,
|
|
initialize_connection,
|
|
logger,
|
|
send_email,
|
|
)
|
|
from zerver.lib.test_classes import ZulipTestCase
|
|
|
|
|
|
class TestBuildEmail(ZulipTestCase):
|
|
def test_build_SES_compatible_From_field_limit(self) -> None:
|
|
hamlet = self.example_user("hamlet")
|
|
limit_length_name = "a" * (320 - len(sanitize_address(FromAddress.NOREPLY, "utf-8")) - 3)
|
|
mail = build_email(
|
|
"zerver/emails/password_reset",
|
|
to_emails=[hamlet],
|
|
from_name=limit_length_name,
|
|
from_address=FromAddress.NOREPLY,
|
|
language="en",
|
|
)
|
|
self.assertEqual(
|
|
mail.extra_headers["From"], "{} <{}>".format(limit_length_name, FromAddress.NOREPLY)
|
|
)
|
|
|
|
def test_build_and_send_SES_incompatible_From_address(self) -> None:
|
|
hamlet = self.example_user("hamlet")
|
|
from_name = "Zulip"
|
|
# Address by itself is > 320 bytes even without the name. Should trigger exception.
|
|
overly_long_address = "a" * 320 + "@zulip.com"
|
|
mail = build_email(
|
|
"zerver/emails/password_reset",
|
|
to_emails=[hamlet],
|
|
from_name=from_name,
|
|
from_address=overly_long_address,
|
|
language="en",
|
|
)
|
|
self.assertEqual(mail.extra_headers["From"], overly_long_address)
|
|
self.assertTrue(len(sanitize_address(mail.extra_headers["From"], "utf-8")) > 320)
|
|
|
|
with mock.patch.object(
|
|
EmailBackend, "send_messages", side_effect=SMTPDataError(242, "From field too long.")
|
|
):
|
|
with self.assertLogs(logger=logger) as info_log:
|
|
with self.assertRaises(EmailNotDeliveredException):
|
|
send_email(
|
|
"zerver/emails/password_reset",
|
|
to_emails=[hamlet],
|
|
from_name=from_name,
|
|
from_address=overly_long_address,
|
|
language="en",
|
|
)
|
|
self.assert_length(info_log.records, 2)
|
|
self.assertEqual(
|
|
info_log.output[0], f"INFO:{logger.name}:Sending password_reset email to {mail.to}"
|
|
)
|
|
self.assertTrue(
|
|
info_log.output[1].startswith(
|
|
f"ERROR:{logger.name}:Error sending password_reset email to {mail.to} with error code 242: From field too long."
|
|
)
|
|
)
|
|
|
|
def test_build_and_send_refused(self) -> None:
|
|
hamlet = self.example_user("hamlet")
|
|
address = "zulip@example.com"
|
|
with mock.patch.object(
|
|
EmailBackend,
|
|
"send_messages",
|
|
side_effect=SMTPRecipientsRefused(recipients={address: (550, b"User unknown")}),
|
|
):
|
|
with self.assertLogs(logger=logger) as info_log:
|
|
with self.assertRaises(EmailNotDeliveredException):
|
|
send_email(
|
|
"zerver/emails/password_reset",
|
|
to_emails=[hamlet],
|
|
from_name="Zulip",
|
|
from_address=address,
|
|
language="en",
|
|
)
|
|
self.assert_length(info_log.records, 2)
|
|
self.assertEqual(
|
|
info_log.output[0], f"INFO:{logger.name}:Sending password_reset email to {[hamlet]}"
|
|
)
|
|
self.assertFalse(
|
|
info_log.output[1].startswith(
|
|
f"ERROR:{logger.name}:Error sending password_reset email to {[hamlet]}: {{'{address}': (550, 'User unknown')}}"
|
|
)
|
|
)
|
|
|
|
|
|
class TestSendEmail(ZulipTestCase):
|
|
def test_initialize_connection(self) -> None:
|
|
# Test the new connection case
|
|
with mock.patch.object(EmailBackend, "open", return_value=True):
|
|
backend = initialize_connection(None)
|
|
self.assertTrue(isinstance(backend, EmailBackend))
|
|
|
|
backend = mock.MagicMock(spec=SMTPBackend)
|
|
backend.connection = mock.MagicMock(spec=SMTP)
|
|
|
|
self.assertTrue(isinstance(backend, SMTPBackend))
|
|
|
|
# Test the old connection case when it is still open
|
|
backend.open.return_value = False
|
|
backend.connection.noop.return_value = [250]
|
|
initialize_connection(backend)
|
|
self.assertEqual(backend.open.call_count, 1)
|
|
self.assertEqual(backend.connection.noop.call_count, 1)
|
|
|
|
# Test the old connection case when it was closed by the server
|
|
backend.connection.noop.return_value = [404]
|
|
backend.close.return_value = False
|
|
initialize_connection(backend)
|
|
# 2 more calls to open, 1 more call to noop and 1 call to close
|
|
self.assertEqual(backend.open.call_count, 3)
|
|
self.assertEqual(backend.connection.noop.call_count, 2)
|
|
self.assertEqual(backend.close.call_count, 1)
|
|
|
|
# Test backoff procedure
|
|
backend.open.side_effect = OSError
|
|
with self.assertRaises(OSError):
|
|
initialize_connection(backend)
|
|
# 3 more calls to open as we try 3 times before giving up
|
|
self.assertEqual(backend.open.call_count, 6)
|
|
|
|
def test_send_email_exceptions(self) -> None:
|
|
hamlet = self.example_user("hamlet")
|
|
from_name = FromAddress.security_email_from_name(language="en")
|
|
# Used to check the output
|
|
mail = build_email(
|
|
"zerver/emails/password_reset",
|
|
to_emails=[hamlet],
|
|
from_name=from_name,
|
|
from_address=FromAddress.NOREPLY,
|
|
language="en",
|
|
)
|
|
self.assertEqual(
|
|
mail.extra_headers["From"], "{} <{}>".format(from_name, FromAddress.NOREPLY)
|
|
)
|
|
|
|
# We test the two cases that should raise an EmailNotDeliveredException
|
|
errors = {
|
|
f"Unknown error sending password_reset email to {mail.to}": [0],
|
|
f"Error sending password_reset email to {mail.to}": [SMTPException()],
|
|
}
|
|
|
|
for message, side_effect in errors.items():
|
|
with mock.patch.object(EmailBackend, "send_messages", side_effect=side_effect):
|
|
with self.assertLogs(logger=logger) as info_log:
|
|
with self.assertRaises(EmailNotDeliveredException):
|
|
send_email(
|
|
"zerver/emails/password_reset",
|
|
to_emails=[hamlet],
|
|
from_name=from_name,
|
|
from_address=FromAddress.NOREPLY,
|
|
language="en",
|
|
)
|
|
self.assert_length(info_log.records, 2)
|
|
self.assertEqual(
|
|
info_log.output[0],
|
|
f"INFO:{logger.name}:Sending password_reset email to {mail.to}",
|
|
)
|
|
self.assertTrue(info_log.output[1].startswith(f"ERROR:zulip.send_email:{message}"))
|