email: Migrate to new Python ≥ 3.3 email API.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
Anders Kaseorg 2020-06-05 14:26:35 -07:00 committed by Tim Abbott
parent 62f1a9da26
commit bff3dcadc8
15 changed files with 199 additions and 191 deletions

View File

@ -1,8 +1,6 @@
import logging
import re
from email.header import decode_header, make_header
from email.message import Message as EmailMessage
from email.utils import getaddresses
from email.message import EmailMessage
from typing import Dict, List, Optional, Tuple
from django.conf import settings
@ -167,17 +165,11 @@ def construct_zulip_body(message: EmailMessage, realm: Realm, show_sender: bool=
body = '(No email body)'
if show_sender:
sender = handle_header_content(message.get("From", ""))
sender = str(message.get("From", ""))
body = f"From: {sender}\n{body}"
return body
def handle_header_content(content: str) -> str:
"""
Deals with converting encoded headers to readable python string.
"""
return str(make_header(decode_header(content)))
## Sending the Zulip ##
class ZulipEmailForwardUserError(ZulipEmailForwardError):
@ -280,11 +272,7 @@ def extract_and_upload_attachments(message: EmailMessage, realm: Realm) -> str:
attachment_links = []
for part in message.walk():
content_type = part.get_content_type()
encoded_filename = part.get_filename()
if not encoded_filename:
continue
filename = handle_header_content(encoded_filename)
filename = part.get_filename()
if filename:
attachment = part.get_payload(decode=True)
if isinstance(attachment, bytes):
@ -320,13 +308,11 @@ def find_emailgateway_recipient(message: EmailMessage) -> str:
pattern_parts = [re.escape(part) for part in settings.EMAIL_GATEWAY_PATTERN.split('%s')]
match_email_re = re.compile(".*?".join(pattern_parts))
header_addresses = [str(addr)
for recipient_header in recipient_headers
for addr in message.get_all(recipient_header, [])]
for addr_tuple in getaddresses(header_addresses):
if match_email_re.match(addr_tuple[1]):
return addr_tuple[1]
for header_name in recipient_headers:
for header_value in message.get_all(header_name, []):
for addr in header_value.addresses:
if match_email_re.match(addr.addr_spec):
return addr.addr_spec
raise ZulipEmailForwardError("Missing recipient in mirror email")
@ -344,7 +330,7 @@ def is_forwarded(subject: str) -> bool:
return bool(re.match(reg, subject, flags=re.IGNORECASE))
def process_stream_message(to: str, message: EmailMessage) -> None:
subject_header = handle_header_content(message.get("Subject", ""))
subject_header = message.get("Subject", "")
subject = strip_from_subject(subject_header) or "(no topic)"
stream, options = decode_stream_email_address(to)

View File

@ -1,7 +1,7 @@
import re
from collections import defaultdict
from datetime import timedelta
from email.utils import formataddr
from email.headerregistry import Address
from typing import Any, Dict, Iterable, List, Optional, Tuple
import html2text
@ -347,7 +347,7 @@ def do_send_missedmessage_events_reply_in_zulip(user_profile: UserProfile,
from zerver.lib.email_mirror import create_missed_message_address
reply_to_address = create_missed_message_address(user_profile, missed_messages[0]['message'])
if reply_to_address == FromAddress.NOREPLY:
reply_to_name = None
reply_to_name = ""
else:
reply_to_name = "Zulip"
@ -435,7 +435,7 @@ def do_send_missedmessage_events_reply_in_zulip(user_profile: UserProfile,
'to_user_ids': [user_profile.id],
'from_name': from_name,
'from_address': from_address,
'reply_to_email': formataddr((reply_to_name, reply_to_address)),
'reply_to_email': str(Address(display_name=reply_to_name, addr_spec=reply_to_address)),
'context': context}
queue_json_publish("email_senders", email_dict)

View File

@ -2,9 +2,10 @@ import datetime
import hashlib
import logging
import os
from email.headerregistry import Address
from email.parser import Parser
from email.policy import default
from email.utils import formataddr, parseaddr
from email.utils import parseaddr
from typing import Any, Dict, List, Mapping, Optional, Tuple
import ujson
@ -61,7 +62,7 @@ def build_email(template_prefix: str, to_user_ids: Optional[List[int]]=None,
assert (to_user_ids is None) ^ (to_emails is None)
if to_user_ids is not None:
to_users = [get_user_profile_by_id(to_user_id) for to_user_id in to_user_ids]
to_emails = [formataddr((to_user.full_name, to_user.delivery_email)) for to_user in to_users]
to_emails = [str(Address(display_name=to_user.full_name, addr_spec=to_user.delivery_email)) for to_user in to_users]
context = {
**context,
@ -107,7 +108,7 @@ def build_email(template_prefix: str, to_user_ids: Optional[List[int]]=None,
if from_address == FromAddress.support_placeholder:
from_address = FromAddress.SUPPORT
from_email = formataddr((from_name, from_address))
from_email = str(Address(display_name=from_name, addr_spec=from_address))
reply_to = None
if reply_to_email is not None:
reply_to = [reply_to_email]

View File

@ -5,7 +5,6 @@ import shutil
import tempfile
import urllib
from contextlib import contextmanager
from email.utils import parseaddr
from typing import Any, Dict, Iterable, Iterator, List, Optional, Sequence, Set, Tuple, Union, cast
from unittest import mock
@ -473,7 +472,10 @@ class ZulipTestCase(TestCase):
# This is a bit of a crude heuristic, but good enough for most tests.
url_pattern = settings.EXTERNAL_HOST + r"(\S+)>"
for message in reversed(outbox):
if email_address in parseaddr(message.to)[1]:
if any(
addr == email_address or addr.endswith(f" <{email_address}>")
for addr in message.to
):
return re.search(url_pattern, message.body).groups()[0]
else:
raise AssertionError("Couldn't find a confirmation email.")

View File

@ -18,8 +18,9 @@ recipient address and retrieve, forward, and archive the message.
"""
import email
import email.policy
import logging
from email.message import Message
from email.message import EmailMessage
from imaplib import IMAP4_SSL
from typing import Any, Generator
@ -40,7 +41,7 @@ logger.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
def get_imap_messages() -> Generator[Message, None, None]:
def get_imap_messages() -> Generator[EmailMessage, None, None]:
mbox = IMAP4_SSL(settings.EMAIL_GATEWAY_IMAP_SERVER, settings.EMAIL_GATEWAY_IMAP_PORT)
mbox.login(settings.EMAIL_GATEWAY_LOGIN, settings.EMAIL_GATEWAY_PASSWORD)
try:
@ -51,7 +52,8 @@ def get_imap_messages() -> Generator[Message, None, None]:
status, msg_data = mbox.fetch(message_id, '(RFC822)')
assert isinstance(msg_data[0], tuple)
msg_as_bytes = msg_data[0][1]
message = email.message_from_bytes(msg_as_bytes)
message = email.message_from_bytes(msg_as_bytes, policy=email.policy.default)
assert isinstance(message, EmailMessage) # https://github.com/python/typeshed/issues/2417
yield message
mbox.store(message_id, '+FLAGS', '\\Deleted')
mbox.expunge()

View File

@ -1,7 +1,7 @@
import email
import email.policy
import os
from email.message import Message
from email.mime.text import MIMEText
from email.message import EmailMessage
from typing import Dict, Optional
import ujson
@ -17,7 +17,7 @@ from zerver.models import Realm, get_realm, get_stream
# to the email mirror. Simple emails can be passed in a JSON file,
# Look at zerver/tests/fixtures/email/1.json for an example of how
# it should look. You can also pass a file which has the raw email,
# for example by writing an email.message.Message type object
# for example by writing an email.message.EmailMessage type object
# to a file using as_string() or as_bytes() methods, or copy-pasting
# the content of "Show original" on an email in Gmail.
# See zerver/tests/fixtures/email/1.txt for a very simple example,
@ -66,40 +66,41 @@ Example:
full_fixture_path = os.path.join(settings.DEPLOY_ROOT, options['fixture'])
# parse the input email into Message type and prepare to process_message() it
# parse the input email into EmailMessage type and prepare to process_message() it
message = self._parse_email_fixture(full_fixture_path)
self._prepare_message(message, realm, stream)
data: Dict[str, str] = {}
data['recipient'] = str(message['To']) # Need str() here to avoid mypy throwing an error
data['recipient'] = message['To'].addresses[0].addr_spec
data['msg_text'] = message.as_string()
mirror_email_message(data)
def _does_fixture_path_exist(self, fixture_path: str) -> bool:
return os.path.exists(fixture_path)
def _parse_email_json_fixture(self, fixture_path: str) -> Message:
def _parse_email_json_fixture(self, fixture_path: str) -> EmailMessage:
with open(fixture_path) as fp:
json_content = ujson.load(fp)[0]
message = MIMEText(json_content['body'])
message = EmailMessage()
message['From'] = json_content['from']
message['Subject'] = json_content['subject']
message.set_content(json_content['body'])
return message
def _parse_email_fixture(self, fixture_path: str) -> Message:
def _parse_email_fixture(self, fixture_path: str) -> EmailMessage:
if not self._does_fixture_path_exist(fixture_path):
raise CommandError(f'Fixture {fixture_path} does not exist')
if fixture_path.endswith('.json'):
message = self._parse_email_json_fixture(fixture_path)
return self._parse_email_json_fixture(fixture_path)
else:
with open(fixture_path, "rb") as fp:
message = email.message_from_binary_file(fp)
message = email.message_from_binary_file(fp, policy=email.policy.default)
assert isinstance(message, EmailMessage) # https://github.com/python/typeshed/issues/2417
return message
return message
def _prepare_message(self, message: Message, realm: Realm, stream_name: str) -> None:
def _prepare_message(self, message: EmailMessage, realm: Realm, stream_name: str) -> None:
stream = get_stream(stream_name, realm)
# The block below ensures that the imported email message doesn't have any recipient-like

View File

@ -1,6 +1,4 @@
import datetime
import re
from email.utils import parseaddr
from django.core import mail
from django.utils.timezone import now
@ -104,11 +102,11 @@ class EmailChangeTestCase(ZulipTestCase):
'Verify your new email address',
)
body = email_message.body
from_email = email_message.from_email
self.assertIn('We received a request to change the email', body)
self.assertIn('Zulip Account Security', from_email)
tokenized_no_reply_email = parseaddr(email_message.from_email)[1]
self.assertTrue(re.search(self.TOKENIZED_NOREPLY_REGEX, tokenized_no_reply_email))
self.assertRegex(
email_message.from_email,
fr"^Zulip Account Security <{self.TOKENIZED_NOREPLY_REGEX}>\Z",
)
activation_url = [s for s in body.split('\n') if s][2]
response = self.client_get(activation_url)

View File

@ -1,9 +1,8 @@
import email.policy
import os
import subprocess
from email import message_from_string
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.message import EmailMessage, MIMEPart
from typing import Any, Callable, Dict, Mapping, Optional
from unittest import mock
@ -215,7 +214,8 @@ class TestStreamEmailMessagesSuccess(ZulipTestCase):
stream_to_address = encode_email_address(stream)
incoming_valid_message = MIMEText('TestStreamEmailMessages Body')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('TestStreamEmailMessages Body')
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('hamlet')
@ -239,7 +239,8 @@ class TestStreamEmailMessagesSuccess(ZulipTestCase):
stream_to_address = encode_email_address(stream)
incoming_valid_message = MIMEText('TestStreamEmailMessages Body')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('TestStreamEmailMessages Body')
incoming_valid_message['Subject'] = ''
incoming_valid_message['From'] = self.example_email('hamlet')
@ -264,7 +265,8 @@ class TestStreamEmailMessagesSuccess(ZulipTestCase):
stream_to_address = encode_email_address(stream)
incoming_valid_message = MIMEText('TestStreamEmailMessages Body')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('TestStreamEmailMessages Body')
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('hamlet')
@ -290,7 +292,8 @@ class TestStreamEmailMessagesSuccess(ZulipTestCase):
stream_to_addresses = ["A.N. Other <another@example.org>",
f"Denmark <{encode_email_address(stream)}>"]
incoming_valid_message = MIMEText('TestStreamEmailMessages Body')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('TestStreamEmailMessages Body')
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('hamlet')
@ -317,7 +320,8 @@ class TestStreamEmailMessagesSuccess(ZulipTestCase):
parts[0] += "+show-sender"
stream_to_address = '@'.join(parts)
incoming_valid_message = MIMEText('TestStreamEmailMessages Body')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('TestStreamEmailMessages Body')
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('hamlet')
incoming_valid_message['To'] = stream_to_address
@ -342,9 +346,10 @@ class TestStreamEmailMessagesSuccess(ZulipTestCase):
parts[0] += "+show-sender"
stream_to_address = '@'.join(parts)
incoming_valid_message = MIMEText('TestStreamEmailMessages Body')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('TestStreamEmailMessages Body')
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = '=?utf-8?b?VGVzdCBVc2Vyw7PEhcSZIDxoYW1sZXRfxJlAenVsaXAuY29tPg==?='
incoming_valid_message['From'] = 'Test =?utf-8?b?VXNlcsOzxIXEmQ==?= <=?utf-8?q?hamlet=5F=C4=99?=@zulip.com>'
incoming_valid_message['To'] = stream_to_address
incoming_valid_message['Reply-to'] = self.example_email('othello')
@ -371,7 +376,8 @@ class TestStreamEmailMessagesSuccess(ZulipTestCase):
--
Footer"""
incoming_valid_message = MIMEText(text)
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content(text)
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('hamlet')
incoming_valid_message['To'] = stream_to_address
@ -401,7 +407,8 @@ class TestStreamEmailMessagesSuccess(ZulipTestCase):
Quote"""
incoming_valid_message = MIMEText(text)
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content(text)
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('hamlet')
incoming_valid_message['To'] = stream_to_address
@ -422,15 +429,17 @@ class TestEmailMirrorMessagesWithAttachments(ZulipTestCase):
stream = get_stream("Denmark", user_profile.realm)
stream_to_address = encode_email_address(stream)
incoming_valid_message = MIMEMultipart()
text_msg = MIMEText("Test body")
incoming_valid_message.attach(text_msg)
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content("Test body")
with open(os.path.join(settings.DEPLOY_ROOT, "static/images/default-avatar.png"), 'rb') as f:
image_bytes = f.read()
attachment_msg = MIMEImage(image_bytes)
attachment_msg.add_header('Content-Disposition', 'attachment', filename="image.png")
incoming_valid_message.attach(attachment_msg)
incoming_valid_message.add_attachment(
image_bytes,
maintype="image",
subtype="png",
filename="image.png",
)
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('hamlet')
@ -455,17 +464,18 @@ class TestEmailMirrorMessagesWithAttachments(ZulipTestCase):
stream = get_stream("Denmark", user_profile.realm)
stream_to_address = encode_email_address(stream)
incoming_valid_message = MIMEMultipart()
text_msg = MIMEText("Test body")
incoming_valid_message.attach(text_msg)
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content("Test body")
with open(os.path.join(settings.DEPLOY_ROOT, "static/images/default-avatar.png"), 'rb') as f:
image_bytes = f.read()
attachment_msg = MIMEImage(image_bytes)
utf8_filename = "image_ąęó.png"
encoded_filename = "=?utf-8?b?aW1hZ2VfxIXEmcOzLnBuZw==?="
attachment_msg.add_header('Content-Disposition', 'attachment', filename=encoded_filename)
incoming_valid_message.attach(attachment_msg)
incoming_valid_message.add_attachment(
image_bytes,
maintype="image",
subtype="png",
filename=utf8_filename,
)
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('hamlet')
@ -490,20 +500,21 @@ class TestEmailMirrorMessagesWithAttachments(ZulipTestCase):
stream = get_stream("Denmark", user_profile.realm)
stream_to_address = encode_email_address(stream)
incoming_valid_message = MIMEMultipart()
text_msg = MIMEText("Test body")
incoming_valid_message.attach(text_msg)
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content("Test body")
nested_multipart = MIMEMultipart()
nested_text_message = MIMEText("Nested text that should get skipped.")
nested_multipart.attach(nested_text_message)
nested_multipart = EmailMessage()
nested_multipart.set_content("Nested text that should get skipped.")
with open(os.path.join(settings.DEPLOY_ROOT, "static/images/default-avatar.png"), 'rb') as f:
image_bytes = f.read()
attachment_msg = MIMEImage(image_bytes)
attachment_msg.add_header('Content-Disposition', 'attachment', filename="image.png")
nested_multipart.attach(attachment_msg)
incoming_valid_message.attach(nested_multipart)
nested_multipart.add_attachment(
image_bytes,
maintype="image",
subtype="png",
filename="image.png",
)
incoming_valid_message.add_attachment(nested_multipart)
incoming_valid_message['Subject'] = 'Subject'
incoming_valid_message['From'] = self.example_email('hamlet')
@ -528,13 +539,12 @@ class TestEmailMirrorMessagesWithAttachments(ZulipTestCase):
stream = get_stream("Denmark", user_profile.realm)
stream_to_address = encode_email_address(stream)
incoming_valid_message = MIMEMultipart()
text_msg = MIMEText("Test body")
incoming_valid_message.attach(text_msg)
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content("Test body")
# Create an invalid attachment:
attachment_msg = MIMEMultipart()
attachment_msg = MIMEPart()
attachment_msg.add_header('Content-Disposition', 'attachment', filename="some_attachment")
incoming_valid_message.attach(attachment_msg)
incoming_valid_message.add_attachment(attachment_msg)
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('hamlet')
@ -559,11 +569,9 @@ class TestEmailMirrorMessagesWithAttachments(ZulipTestCase):
text = "Test message"
html = "<html><body><b>Test html message</b></body></html>"
incoming_valid_message = MIMEMultipart()
text_message = MIMEText(text)
html_message = MIMEText(html, 'html')
incoming_valid_message.attach(text_message)
incoming_valid_message.attach(html_message)
incoming_valid_message = EmailMessage()
incoming_valid_message.add_alternative(text)
incoming_valid_message.add_alternative(html, subtype="html")
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('hamlet')
@ -594,11 +602,9 @@ class TestEmailMirrorMessagesWithAttachments(ZulipTestCase):
# This should be correctly identified as empty html body:
html = "<html><body></body></html>"
incoming_valid_message = MIMEMultipart()
text_message = MIMEText(text)
html_message = MIMEText(html, 'html')
incoming_valid_message.attach(text_message)
incoming_valid_message.attach(html_message)
incoming_valid_message = EmailMessage()
incoming_valid_message.add_alternative(text)
incoming_valid_message.add_alternative(html, subtype="html")
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('hamlet')
@ -622,7 +628,8 @@ class TestStreamEmailMessagesEmptyBody(ZulipTestCase):
stream_to_address = encode_email_address(stream)
# empty body
incoming_valid_message = MIMEText('')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('')
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('hamlet')
@ -640,9 +647,13 @@ class TestStreamEmailMessagesEmptyBody(ZulipTestCase):
stream = get_stream("Denmark", user_profile.realm)
stream_to_address = encode_email_address(stream)
# No textual body
incoming_valid_message = MIMEMultipart()
incoming_valid_message = EmailMessage()
with open(os.path.join(settings.DEPLOY_ROOT, "static/images/default-avatar.png"), 'rb') as f:
incoming_valid_message.attach(MIMEImage(f.read()))
incoming_valid_message.add_attachment(
f.read(),
maintype="image",
subtype="png",
)
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('hamlet')
@ -664,7 +675,8 @@ class TestStreamEmailMessagesEmptyBody(ZulipTestCase):
headers['Reply-To'] = self.example_email('othello')
# empty body
incoming_valid_message = MIMEText('-- \nFooter')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('-- \nFooter')
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('hamlet')
@ -697,7 +709,8 @@ class TestMissedMessageEmailMessages(ZulipTestCase):
# token for looking up who did reply.
mm_address = create_missed_message_address(user_profile, usermessage.message)
incoming_valid_message = MIMEText('TestMissedMessageEmailMessages Body')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('TestMissedMessageEmailMessages Body')
incoming_valid_message['Subject'] = 'TestMissedMessageEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('othello')
@ -736,7 +749,8 @@ class TestMissedMessageEmailMessages(ZulipTestCase):
# token for looking up who did reply.
mm_address = create_missed_message_address(user_profile, usermessage.message)
incoming_valid_message = MIMEText('TestMissedHuddleMessageEmailMessages Body')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('TestMissedHuddleMessageEmailMessages Body')
incoming_valid_message['Subject'] = 'TestMissedHuddleMessageEmailMessages Subject'
incoming_valid_message['From'] = self.example_email('cordelia')
@ -782,7 +796,8 @@ class TestMissedMessageEmailMessages(ZulipTestCase):
mm_address = create_missed_message_address(user_profile, usermessage.message)
incoming_valid_message = MIMEText('TestMissedMessageEmailMessages Body')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('TestMissedMessageEmailMessages Body')
incoming_valid_message['Subject'] = 'TestMissedMessageEmailMessages Subject'
incoming_valid_message['From'] = user_profile.delivery_email
@ -821,7 +836,8 @@ class TestMissedMessageEmailMessages(ZulipTestCase):
usermessage.message.subject = "updated topic"
usermessage.message.save(update_fields=["subject"])
incoming_valid_message = MIMEText('TestMissedMessageEmailMessages Body')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('TestMissedMessageEmailMessages Body')
incoming_valid_message['Subject'] = 'TestMissedMessageEmailMessages Subject'
incoming_valid_message['From'] = user_profile.delivery_email
@ -858,7 +874,8 @@ class TestMissedMessageEmailMessages(ZulipTestCase):
do_deactivate_user(user_profile)
incoming_valid_message = MIMEText('TestMissedMessageEmailMessages Body')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('TestMissedMessageEmailMessages Body')
incoming_valid_message['Subject'] = 'TestMissedMessageEmailMessages Subject'
incoming_valid_message['From'] = user_profile.delivery_email
@ -889,7 +906,8 @@ class TestMissedMessageEmailMessages(ZulipTestCase):
do_deactivate_realm(user_profile.realm)
incoming_valid_message = MIMEText('TestMissedMessageEmailMessages Body')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('TestMissedMessageEmailMessages Body')
incoming_valid_message['Subject'] = 'TestMissedMessageEmailMessages Subject'
incoming_valid_message['From'] = user_profile.delivery_email
@ -918,7 +936,8 @@ class TestMissedMessageEmailMessages(ZulipTestCase):
message = most_recent_message(user_profile)
mm_address = create_missed_message_address(user_profile, message)
incoming_valid_message = MIMEText('TestMissedMessageEmailMessages Body')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('TestMissedMessageEmailMessages Body')
incoming_valid_message['Subject'] = 'TestMissedMessageEmailMessages Subject'
incoming_valid_message['From'] = user_profile.delivery_email
@ -986,7 +1005,8 @@ class TestReplyExtraction(ZulipTestCase):
Quote"""
incoming_valid_message = MIMEText(text)
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content(text)
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = user_profile.delivery_email
@ -1036,7 +1056,8 @@ class TestReplyExtraction(ZulipTestCase):
</html>
"""
incoming_valid_message = MIMEText(html, 'html')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content(html, subtype="html")
incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
incoming_valid_message['From'] = user_profile.delivery_email
@ -1208,7 +1229,8 @@ class TestStreamEmailMessagesSubjectStripping(ZulipTestCase):
self.subscribe(user_profile, "Denmark")
stream = get_stream("Denmark", user_profile.realm)
stream_to_address = encode_email_address(stream)
incoming_valid_message = MIMEText('TestStreamEmailMessages Body')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('TestStreamEmailMessages Body')
incoming_valid_message['Subject'] = "Re: Fwd: Re: Test"
incoming_valid_message['From'] = self.example_email('hamlet')
incoming_valid_message['To'] = stream_to_address
@ -1238,7 +1260,8 @@ class TestContentTypeUnspecifiedCharset(ZulipTestCase):
message_as_string = self.fixture_data('1.txt', type='email')
message_as_string = message_as_string.replace("Content-Type: text/plain; charset=\"us-ascii\"",
"Content-Type: text/plain")
incoming_message = message_from_string(message_as_string)
incoming_message = message_from_string(message_as_string, policy=email.policy.default)
assert isinstance(incoming_message, EmailMessage) # https://github.com/python/typeshed/issues/2417
user_profile = self.example_user('hamlet')
self.login_user(user_profile)
@ -1255,7 +1278,8 @@ class TestContentTypeUnspecifiedCharset(ZulipTestCase):
class TestEmailMirrorProcessMessageNoValidRecipient(ZulipTestCase):
def test_process_message_no_valid_recipient(self) -> None:
incoming_valid_message = MIMEText('Test Body')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('Test Body')
incoming_valid_message['Subject'] = "Test Subject"
incoming_valid_message['From'] = self.example_email('hamlet')
incoming_valid_message['To'] = "address@wrongdomain, address@notzulip"
@ -1277,7 +1301,8 @@ class TestEmailMirrorLogAndReport(ZulipTestCase):
address_parts = stream_to_address.split('@')
scrubbed_address = 'X'*len(address_parts[0]) + '@' + address_parts[1]
incoming_valid_message = MIMEText('Test Body')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('Test Body')
incoming_valid_message['Subject'] = "Test Subject"
incoming_valid_message['From'] = self.example_email('hamlet')
incoming_valid_message['To'] = stream_to_address
@ -1304,7 +1329,8 @@ class TestEmailMirrorLogAndReport(ZulipTestCase):
@mock.patch('zerver.lib.email_mirror.logger.error')
def test_log_and_report_no_errorbot(self, mock_error: mock.MagicMock) -> None:
with self.settings(ERROR_BOT=None):
incoming_valid_message = MIMEText('Test Body')
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content('Test Body')
incoming_valid_message['Subject'] = "Test Subject"
incoming_valid_message['From'] = self.example_email('hamlet')
log_and_report(incoming_valid_message, "test error message", None)

View File

@ -1,6 +1,6 @@
import random
import re
from email.utils import formataddr
from email.headerregistry import Address
from typing import List, Sequence
from unittest.mock import patch
@ -230,11 +230,11 @@ class TestMissedMessages(ZulipTestCase):
handle_missedmessage_emails(hamlet.id, [{'message_id': msg_id, 'trigger': trigger}])
if settings.EMAIL_GATEWAY_PATTERN != "":
reply_to_addresses = [settings.EMAIL_GATEWAY_PATTERN % (t,) for t in tokens]
reply_to_emails = [formataddr(("Zulip", address)) for address in reply_to_addresses]
reply_to_emails = [str(Address(display_name="Zulip", addr_spec=address)) for address in reply_to_addresses]
else:
reply_to_emails = ["noreply@testserver"]
msg = mail.outbox[0]
from_email = formataddr(("Zulip missed messages", FromAddress.NOREPLY))
from_email = str(Address(display_name="Zulip missed messages", addr_spec=FromAddress.NOREPLY))
self.assertEqual(len(mail.outbox), 1)
if send_as_user:
from_email = f'"{othello.full_name}" <{othello.email}>'

View File

@ -2,7 +2,6 @@ import glob
import os
import re
from datetime import timedelta
from email.utils import parseaddr
from typing import Any, Dict, List, Optional
from unittest import mock
from unittest.mock import MagicMock, call, patch
@ -337,10 +336,10 @@ class TestPasswordRestEmail(ZulipTestCase):
def test_if_command_sends_password_reset_email(self) -> None:
call_command(self.COMMAND_NAME, users=self.example_email("iago"))
from django.core.mail import outbox
from_email = outbox[0].from_email
self.assertIn("Zulip Account Security", from_email)
tokenized_no_reply_email = parseaddr(from_email)[1]
self.assertTrue(re.search(self.TOKENIZED_NOREPLY_REGEX, tokenized_no_reply_email))
self.assertRegex(
outbox[0].from_email,
fr"^Zulip Account Security <{self.TOKENIZED_NOREPLY_REGEX}>\Z",
)
self.assertIn("reset your password", outbox[0].body)
class TestRealmReactivationEmail(ZulipTestCase):

View File

@ -1,6 +1,5 @@
import datetime
import re
from email.utils import parseaddr
from typing import Any, Dict, List, Mapping
from unittest import mock
@ -244,10 +243,10 @@ class RealmTest(ZulipTestCase):
do_send_realm_reactivation_email(realm)
from django.core.mail import outbox
self.assertEqual(len(outbox), 1)
from_email = outbox[0].from_email
tokenized_no_reply_email = parseaddr(from_email)[1]
self.assertIn("Zulip Account Security", from_email)
self.assertTrue(re.search(self.TOKENIZED_NOREPLY_REGEX, tokenized_no_reply_email))
self.assertRegex(
outbox[0].from_email,
fr"^Zulip Account Security <{self.TOKENIZED_NOREPLY_REGEX}>\Z",
)
self.assertIn('Reactivate your Zulip organization', outbox[0].subject)
self.assertIn('Dear former administrators', outbox[0].body)
admins = realm.get_human_admin_users()

View File

@ -3,7 +3,6 @@ import re
import smtplib
import time
import urllib
from email.utils import parseaddr
from typing import Any, List, Optional, Sequence
from unittest.mock import MagicMock, patch
@ -238,6 +237,15 @@ class PasswordResetTest(ZulipTestCase):
Log in, reset password, log out, log in with new password.
"""
def get_reset_mail_body(self) -> str:
from django.core.mail import outbox
[message] = outbox
self.assertRegex(
message.from_email,
fr"^Zulip Account Security <{self.TOKENIZED_NOREPLY_REGEX}>\Z",
)
return message.body
def test_password_reset(self) -> None:
user = self.example_user("hamlet")
email = user.delivery_email
@ -261,12 +269,8 @@ class PasswordResetTest(ZulipTestCase):
self.assert_in_response("Check your email in a few minutes to finish the process.", result)
# Check that the password reset email is from a noreply address.
from django.core.mail import outbox
from_email = outbox[0].from_email
self.assertIn("Zulip Account Security", from_email)
tokenized_no_reply_email = parseaddr(from_email)[1]
self.assertTrue(re.search(self.TOKENIZED_NOREPLY_REGEX, tokenized_no_reply_email))
self.assertIn("reset your password", outbox[0].body)
body = self.get_reset_mail_body()
self.assertIn("reset your password", body)
# Visit the password reset link.
password_reset_url = self.get_confirmation_url_from_outbox(
@ -318,16 +322,12 @@ class PasswordResetTest(ZulipTestCase):
self.assert_in_response("Check your email in a few minutes to finish the process.", result)
# Check that the password reset email is from a noreply address.
from django.core.mail import outbox
from_email = outbox[0].from_email
self.assertIn("Zulip Account Security", from_email)
tokenized_no_reply_email = parseaddr(from_email)[1]
self.assertTrue(re.search(self.TOKENIZED_NOREPLY_REGEX, tokenized_no_reply_email))
self.assertIn('Somebody (possibly you) requested a new password', outbox[0].body)
self.assertIn('You do not have an account', outbox[0].body)
self.assertIn('safely ignore', outbox[0].body)
self.assertNotIn('reset your password', outbox[0].body)
self.assertNotIn('deactivated', outbox[0].body)
body = self.get_reset_mail_body()
self.assertIn('Somebody (possibly you) requested a new password', body)
self.assertIn('You do not have an account', body)
self.assertIn('safely ignore', body)
self.assertNotIn('reset your password', body)
self.assertNotIn('deactivated', body)
def test_password_reset_for_deactivated_user(self) -> None:
user_profile = self.example_user("hamlet")
@ -346,16 +346,12 @@ class PasswordResetTest(ZulipTestCase):
self.assert_in_response("Check your email in a few minutes to finish the process.", result)
# Check that the password reset email is from a noreply address.
from django.core.mail import outbox
from_email = outbox[0].from_email
self.assertIn("Zulip Account Security", from_email)
tokenized_no_reply_email = parseaddr(from_email)[1]
self.assertTrue(re.search(self.TOKENIZED_NOREPLY_REGEX, tokenized_no_reply_email))
self.assertIn('Somebody (possibly you) requested a new password', outbox[0].body)
self.assertIn('has been deactivated', outbox[0].body)
self.assertIn('safely ignore', outbox[0].body)
self.assertNotIn('reset your password', outbox[0].body)
self.assertNotIn('not have an account', outbox[0].body)
body = self.get_reset_mail_body()
self.assertIn('Somebody (possibly you) requested a new password', body)
self.assertIn('has been deactivated', body)
self.assertIn('safely ignore', body)
self.assertNotIn('reset your password', body)
self.assertNotIn('not have an account', body)
def test_password_reset_with_deactivated_realm(self) -> None:
user_profile = self.example_user("hamlet")
@ -426,17 +422,14 @@ class PasswordResetTest(ZulipTestCase):
self.assert_in_response("Check your email in a few minutes to finish the process.", result)
from django.core.mail import outbox
self.assertEqual(len(outbox), 1)
tokenized_no_reply_email = parseaddr(outbox[0].from_email)[1]
self.assertTrue(re.search(self.TOKENIZED_NOREPLY_REGEX, tokenized_no_reply_email))
self.assertIn('Somebody (possibly you) requested a new password', outbox[0].body)
self.assertIn('You do not have an account', outbox[0].body)
body = self.get_reset_mail_body()
self.assertIn('Somebody (possibly you) requested a new password', body)
self.assertIn('You do not have an account', body)
self.assertIn("active accounts in the following organization(s).\nhttp://zulip.testserver",
outbox[0].body)
self.assertIn('safely ignore', outbox[0].body)
self.assertNotIn('reset your password', outbox[0].body)
self.assertNotIn('deactivated', outbox[0].body)
body)
self.assertIn('safely ignore', body)
self.assertNotIn('reset your password', body)
self.assertNotIn('deactivated', body)
def test_invalid_subdomain(self) -> None:
email = self.example_email("hamlet")
@ -499,12 +492,8 @@ class PasswordResetTest(ZulipTestCase):
"/accounts/password/reset/done/"))
result = self.client_get(result["Location"])
self.assertEqual(len(outbox), 1)
message = outbox.pop()
tokenized_no_reply_email = parseaddr(message.from_email)[1]
self.assertTrue(re.search(self.TOKENIZED_NOREPLY_REGEX, tokenized_no_reply_email))
self.assertIn('reset your password',
message.body)
body = self.get_reset_mail_body()
self.assertIn('reset your password', body)
def test_redirect_endpoints(self) -> None:
'''
@ -769,8 +758,7 @@ class InviteUserBase(ZulipTestCase):
if custom_from_name is not None:
self.assertIn(custom_from_name, outbox[0].from_email)
tokenized_no_reply_email = parseaddr(outbox[0].from_email)[1]
self.assertTrue(re.search(self.TOKENIZED_NOREPLY_REGEX, tokenized_no_reply_email))
self.assertRegex(outbox[0].from_email, fr" <{self.TOKENIZED_NOREPLY_REGEX}>\Z")
def invite(self, invitee_emails: str, stream_names: Sequence[str], body: str='',
invite_as: int=1) -> HttpResponse:

View File

@ -1,5 +1,5 @@
import datetime
from email.utils import parseaddr
from email.headerregistry import Address
from typing import Any, Dict, Iterable, List, Mapping, Optional, TypeVar, Union
from unittest import mock
@ -1250,8 +1250,13 @@ class ActivateTest(ZulipTestCase):
from django.core.mail import outbox
self.assertEqual(len(outbox), 1)
for message in outbox:
to_fields = [parseaddr(to_field)[1] for to_field in message.to]
self.assertEqual({hamlet.delivery_email, iago.delivery_email}, set(to_fields))
self.assertEqual(
set(message.to),
{
str(Address(display_name=hamlet.full_name, addr_spec=hamlet.delivery_email)),
str(Address(display_name=iago.full_name, addr_spec=iago.delivery_email)),
},
)
self.assertEqual(ScheduledEmail.objects.count(), 0)
class RecipientInfoTest(ZulipTestCase):

View File

@ -2,6 +2,7 @@
import copy
import datetime
import email
import email.policy
import logging
import os
import signal
@ -12,6 +13,7 @@ import time
import urllib
from abc import ABC, abstractmethod
from collections import defaultdict, deque
from email.message import EmailMessage
from functools import wraps
from threading import Timer
from typing import (
@ -552,6 +554,8 @@ class DigestWorker(QueueProcessingWorker): # nocoverage
class MirrorWorker(QueueProcessingWorker):
def consume(self, event: Mapping[str, Any]) -> None:
rcpt_to = event['rcpt_to']
msg = email.message_from_string(event["message"], policy=email.policy.default)
assert isinstance(msg, EmailMessage) # https://github.com/python/typeshed/issues/2417
if not is_missed_message_address(rcpt_to):
# Missed message addresses are one-time use, so we don't need
# to worry about emails to them resulting in message spam.
@ -559,14 +563,12 @@ class MirrorWorker(QueueProcessingWorker):
try:
rate_limit_mirror_by_realm(recipient_realm)
except RateLimited:
msg = email.message_from_string(event["message"])
logger.warning("MirrorWorker: Rejecting an email from: %s "
"to realm: %s - rate limited.",
msg['From'], recipient_realm.name)
return
mirror_email(email.message_from_string(event["message"]),
rcpt_to=rcpt_to)
mirror_email(msg, rcpt_to=rcpt_to)
@assign_queue('test', queue_type="test")
class TestWorker(QueueProcessingWorker):

View File

@ -1,8 +1,7 @@
import configparser
import logging
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.message import EmailMessage
from typing import List
from django.conf import settings
@ -35,7 +34,7 @@ class EmailLogBackEnd(BaseEmailBackend):
from_email = email.from_email
to = get_forward_address()
msg = MIMEMultipart('alternative')
msg = EmailMessage()
msg['Subject'] = email.subject
msg['From'] = from_email
msg['To'] = to
@ -50,13 +49,13 @@ class EmailLogBackEnd(BaseEmailBackend):
czo_email_images_base_uri = 'https://chat.zulip.org/static/images/emails'
html = html.replace(localhost_email_images_base_uri, czo_email_images_base_uri)
msg.attach(MIMEText(text, 'plain'))
msg.attach(MIMEText(html, 'html'))
msg.add_alternative(text, subtype="plain")
msg.add_alternative(html, subtype="html")
smtp = smtplib.SMTP(settings.EMAIL_HOST)
smtp.starttls()
smtp.login(settings.EMAIL_HOST_USER, settings.EMAIL_HOST_PASSWORD)
smtp.sendmail(from_email, to, msg.as_string())
smtp.send_message(msg)
smtp.quit()
def log_email(self, email: EmailMultiAlternatives) -> None: