python: Use a real parser for email addresses.

Now that we can assume Python 3.6+, we can use the
email.headerregistry module to replace hacky manual email address
parsing.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
Anders Kaseorg 2022-07-27 14:33:49 -07:00 committed by Tim Abbott
parent 8c2d478e6a
commit b945aa3443
31 changed files with 165 additions and 139 deletions

View File

@ -2,6 +2,8 @@
const {strict: assert} = require("assert");
const {parseOneAddress} = require("email-addresses");
const {mock_esm, with_overrides, zrequire} = require("../zjsunit/namespace");
const {run_test} = require("../zjsunit/test");
const blueslip = require("../zjsunit/zblueslip");
@ -1575,7 +1577,7 @@ test("navbar_helpers", () => {
icon: "envelope",
title: properly_separated_names([joe.full_name]),
redirect_url_with_search:
"/#narrow/pm-with/" + joe.user_id + "-" + joe.email.split("@")[0],
"/#narrow/pm-with/" + joe.user_id + "-" + parseOneAddress(joe.email).local,
},
{
operator: group_pm,
@ -1610,13 +1612,13 @@ test("navbar_helpers", () => {
{
operator: sender_me,
redirect_url_with_search:
"/#narrow/sender/" + me.user_id + "-" + me.email.split("@")[0],
"/#narrow/sender/" + me.user_id + "-" + parseOneAddress(me.email).local,
is_common_narrow: false,
},
{
operator: sender_joe,
redirect_url_with_search:
"/#narrow/sender/" + joe.user_id + "-" + joe.email.split("@")[0],
"/#narrow/sender/" + joe.user_id + "-" + parseOneAddress(joe.email).local,
is_common_narrow: false,
},
];

View File

@ -33,6 +33,7 @@
"css.escape": "^1.5.1",
"date-fns": "^2.16.1",
"date-fns-tz": "^1.1.1",
"email-addresses": "^5.0.0",
"emoji-datasource-google": "^14.0.0",
"emoji-datasource-google-blob": "npm:emoji-datasource-google@^3.0.0",
"emoji-datasource-twitter": "^14.0.0",

View File

@ -1,5 +1,6 @@
import md5 from "blueimp-md5";
import {format, utcToZonedTime} from "date-fns-tz";
import {parseOneAddress} from "email-addresses";
import * as typeahead from "../shared/js/typeahead";
@ -508,7 +509,7 @@ export function pm_with_url(message) {
} else {
const person = get_by_user_id(user_ids[0]);
if (person && person.email) {
suffix = person.email.split("@")[0].toLowerCase();
suffix = parseOneAddress(person.email).local.toLowerCase();
} else {
blueslip.error("Unknown people in message");
suffix = "unk";
@ -582,7 +583,7 @@ export function emails_to_slug(emails_string) {
const emails = emails_string.split(",");
if (emails.length === 1) {
slug += emails[0].split("@")[0].toLowerCase();
slug += parseOneAddress(emails[0]).local.toLowerCase();
} else {
slug += "group";
}

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3
import os
import sys
from email.headerregistry import Address
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from scripts.lib.setup_path import setup_path
@ -32,7 +33,7 @@ def main() -> None:
session = boto3.Session()
from_address = settings.NOREPLY_EMAIL_ADDRESS
from_host = from_address.split("@")[1]
from_host = Address(addr_spec=from_address).domain
ses: SESClient = session.client("ses")
possible_identities = []

View File

@ -48,4 +48,4 @@ API_FEATURE_LEVEL = 133
# historical commits sharing the same major version, in which case a
# minor version bump suffices.
PROVISION_VERSION = (195, 0)
PROVISION_VERSION = (195, 1)

View File

@ -4191,6 +4191,11 @@ elementary-circuits-directed-graph@^1.0.4:
dependencies:
strongly-connected-components "^1.0.1"
email-addresses@^5.0.0:
version "5.0.0"
resolved "https://registry.yarnpkg.com/email-addresses/-/email-addresses-5.0.0.tgz#7ae9e7f58eef7d5e3e2c2c2d3ea49b78dc854fa6"
integrity sha512-4OIPYlA6JXqtVn8zpHpGiI7vE6EQOAg16aGnDMIAlZVinnoZ8208tW1hAbjWydgN/4PLTT9q+O1K6AH/vALJGw==
"emoji-datasource-google-blob@npm:emoji-datasource-google@^3.0.0":
version "3.0.0"
resolved "https://registry.yarnpkg.com/emoji-datasource-google/-/emoji-datasource-google-3.0.0.tgz#d6f77b56385338e10667d2b150dbe9f9b5a4e921"

View File

@ -1,6 +1,7 @@
import datetime
import logging
from collections import defaultdict
from email.headerregistry import Address
from typing import (
TYPE_CHECKING,
AbstractSet,
@ -92,11 +93,11 @@ if TYPE_CHECKING:
def compute_irc_user_fullname(email: str) -> str:
return email.split("@")[0] + " (IRC)"
return Address(addr_spec=email).username + " (IRC)"
def compute_jabber_user_fullname(email: str) -> str:
return email.split("@")[0] + " (XMPP)"
return Address(addr_spec=email).username + " (XMPP)"
def get_user_profile_delivery_email_cache_key(

View File

@ -1,3 +1,4 @@
from email.headerregistry import Address
from typing import Any, Dict, Literal, Optional
import orjson
@ -370,7 +371,9 @@ def do_scrub_realm(realm: Realm, *, acting_user: Optional[UserProfile]) -> None:
do_delete_messages_by_sender(user)
do_delete_avatar_image(user, acting_user=acting_user)
user.full_name = f"Scrubbed {generate_key()[:15]}"
scrubbed_email = f"scrubbed-{generate_key()[:15]}@{realm.host}"
scrubbed_email = Address(
username=f"scrubbed-{generate_key()[:15]}", domain=realm.host
).addr_spec
user.email = scrubbed_email
user.delivery_email = scrubbed_email
user.save(update_fields=["full_name", "email", "delivery_email"])

View File

@ -1,4 +1,5 @@
from collections import defaultdict
from email.headerregistry import Address
from typing import Any, Dict, List, Optional
import orjson
@ -67,7 +68,9 @@ def do_delete_user(user_profile: UserProfile, *, acting_user: Optional[UserProfi
personal_recipient.delete()
replacement_user = create_user(
force_id=user_id,
email=f"deleteduser{user_id}@{get_fake_email_domain(realm)}",
email=Address(
username=f"deleteduser{user_id}", domain=get_fake_email_domain(realm)
).addr_spec,
password=None,
realm=realm,
full_name=f"Deleted User {user_id}",

View File

@ -6,6 +6,7 @@ import shutil
import subprocess
import zipfile
from collections import defaultdict
from email.headerregistry import Address
from typing import Any, Dict, Iterator, List, Optional, Set, Tuple, Type, TypeVar
import orjson
@ -78,13 +79,16 @@ class SlackBotEmail:
else:
raise AssertionError("Could not identify bot type")
email = slack_bot_name.replace("Bot", "").replace(" ", "").lower() + f"-bot@{domain_name}"
email = Address(
username=slack_bot_name.replace("Bot", "").replace(" ", "").lower() + "-bot",
domain=domain_name,
).addr_spec
if email in cls.duplicate_email_count:
cls.duplicate_email_count[email] += 1
email_prefix, email_suffix = email.split("@")
email_prefix += "-" + str(cls.duplicate_email_count[email])
email = "@".join([email_prefix, email_suffix])
address = Address(addr_spec=email)
email_username = address.username + "-" + str(cls.duplicate_email_count[email])
email = Address(username=email_username, domain=address.domain).addr_spec
else:
cls.duplicate_email_count[email] = 1
@ -400,11 +404,11 @@ def get_user_email(user: ZerverFieldsT, domain_name: str) -> str:
if "email" in user["profile"]:
return user["profile"]["email"]
if user["is_mirror_dummy"]:
return "{}@{}.slack.com".format(user["name"], user["team_domain"])
return Address(username=user["name"], domain=f'{user["team_domain"]}.slack.com').addr_spec
if "bot_id" in user["profile"]:
return SlackBotEmail.get_email(user["profile"], domain_name)
if get_user_full_name(user).lower() == "slackbot":
return f"imported-slackbot-bot@{domain_name}"
return Address(username="imported-slackbot-bot", domain=domain_name).addr_spec
raise AssertionError(f"Could not find email address for Slack user {user}")

View File

@ -1,5 +1,6 @@
import logging
import re
from email.headerregistry import Address
from typing import Any, Dict, List, Optional, Tuple
import DNS
@ -39,7 +40,6 @@ from zerver.models import (
EmailContainsPlusError,
Realm,
UserProfile,
email_to_domain,
get_realm,
get_user_by_delivery_email,
is_cross_realm_bot_email,
@ -68,11 +68,11 @@ PASSWORD_TOO_WEAK_ERROR = gettext_lazy("The password is too weak.")
def email_is_not_mit_mailing_list(email: str) -> None:
"""Prevent MIT mailing lists from signing up for Zulip"""
if "@mit.edu" in email:
username = email.rsplit("@", 1)[0]
address = Address(addr_spec=email)
if address.domain == "mit.edu":
# Check whether the user exists and can get mail.
try:
DNS.dnslookup(f"{username}.pobox.ns.athena.mit.edu", DNS.Type.TXT)
DNS.dnslookup(f"{address.username}.pobox.ns.athena.mit.edu", DNS.Type.TXT)
except DNS.Base.ServerError as e:
if e.rcode == DNS.Status.NXDOMAIN:
# This error is mark_safe only because 1. it needs to render HTML
@ -240,7 +240,7 @@ def email_not_system_bot(email: str) -> None:
def email_is_not_disposable(email: str) -> None:
if is_disposable_domain(email_to_domain(email)):
if is_disposable_domain(Address(addr_spec=email).domain):
raise ValidationError(_("Please use your real email address."))

View File

@ -1,4 +1,5 @@
from datetime import datetime
from email.headerregistry import Address
from typing import Optional, Union
import orjson
@ -62,7 +63,9 @@ def copy_default_settings(
def get_display_email_address(user_profile: UserProfile) -> str:
if not user_profile.email_address_is_realm_public():
return f"user{user_profile.id}@{get_fake_email_domain(user_profile.realm)}"
return Address(
username=f"user{user_profile.id}", domain=get_fake_email_domain(user_profile.realm)
).addr_spec
return user_profile.delivery_email

View File

@ -1,6 +1,7 @@
import glob
import logging
import os
from email.headerregistry import Address
from typing import Any, Dict, List, Optional
from django.conf import settings
@ -30,7 +31,7 @@ def generate_dev_ldap_dir(mode: str, num_users: int = 8) -> Dict[str, Dict[str,
ldap_dir = {}
for i, user_data in enumerate(ldap_data):
email = user_data[1].lower()
email_username = email.split("@")[0]
email_username = Address(addr_spec=email).username
common_data = {
"cn": [user_data[0]],
"userPassword": [email_username],

View File

@ -1,9 +1,9 @@
import logging
import re
import secrets
from email.headerregistry import AddressHeader
from email.headerregistry import Address, AddressHeader
from email.message import EmailMessage
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Match, Optional, Tuple
from django.conf import settings
from django.utils.timezone import now as timezone_now
@ -49,35 +49,27 @@ logger = logging.getLogger(__name__)
def redact_email_address(error_message: str) -> str:
if not settings.EMAIL_GATEWAY_EXTRA_PATTERN_HACK:
domain = settings.EMAIL_GATEWAY_PATTERN.rsplit("@")[-1]
domain = Address(addr_spec=settings.EMAIL_GATEWAY_PATTERN).domain
else:
# EMAIL_GATEWAY_EXTRA_PATTERN_HACK is of the form '@example.com'
domain = settings.EMAIL_GATEWAY_EXTRA_PATTERN_HACK[1:]
address_match = re.search("\\b(\\S*?)@" + domain, error_message)
if address_match:
email_address = address_match.group(0)
def redact(address_match: Match[str]) -> str:
email_address = address_match[0]
# Annotate basic info about the address before scrubbing:
if is_missed_message_address(email_address):
redacted_message = error_message.replace(
email_address, f"{email_address} <Missed message address>"
)
annotation = " <Missed message address>"
else:
try:
target_stream_id = decode_stream_email_address(email_address)[0].id
annotated_address = f"{email_address} <Address to stream id: {target_stream_id}>"
redacted_message = error_message.replace(email_address, annotated_address)
annotation = f" <Address to stream id: {target_stream_id}>"
except ZulipEmailForwardError:
redacted_message = error_message.replace(
email_address, f"{email_address} <Invalid address>"
)
annotation = " <Invalid address>"
# Scrub the address from the message, to the form XXXXX@example.com:
string_to_scrub = address_match.groups()[0]
redacted_message = redacted_message.replace(string_to_scrub, "X" * len(string_to_scrub))
return redacted_message
return "X" * len(address_match[1]) + address_match[2] + annotation
return error_message
return re.sub(rf"\b(\S*?)(@{re.escape(domain)})", redact, error_message)
def report_to_zulip(error_message: str) -> None:

View File

@ -1,3 +1,4 @@
from email.headerregistry import Address
from typing import Callable, Dict, Optional, Set, Tuple
from django.core import validators
@ -13,15 +14,13 @@ from zerver.models import (
EmailContainsPlusError,
Realm,
RealmDomain,
email_to_domain,
email_to_username,
get_users_by_delivery_email,
is_cross_realm_bot_email,
)
def validate_disposable(email: str) -> None:
if is_disposable_domain(email_to_domain(email)):
if is_disposable_domain(Address(addr_spec=email).domain):
raise DisposableEmailError
@ -59,10 +58,11 @@ def get_realm_email_validator(realm: Realm) -> Callable[[str], None]:
a small whitelist.
"""
if "+" in email_to_username(email):
address = Address(addr_spec=email)
if "+" in address.username:
raise EmailContainsPlusError
domain = email_to_domain(email)
domain = address.domain
if domain in allowed_domains:
return

View File

@ -1,3 +1,4 @@
from email.headerregistry import Address
from typing import Any, Dict, List
from urllib.parse import quote, urlsplit
@ -20,7 +21,7 @@ def encode_stream(stream_id: int, stream_name: str) -> str:
def personal_narrow_url(realm: Realm, sender: UserProfile) -> str:
base_url = f"{realm.uri}/#narrow/pm-with/"
email_user = sender.email.split("@")[0].lower()
email_user = Address(addr_spec=sender.email).username.lower()
pm_slug = str(sender.id) + "-" + hash_util_encode(email_user)
return base_url + pm_slug

View File

@ -1,3 +1,4 @@
from email.headerregistry import Address
from typing import Any
from django.conf import settings
@ -23,14 +24,14 @@ def get_fake_email_domain(realm: Any) -> str:
try:
# Check that realm.host can be used to form valid email addresses.
realm_host = host_for_subdomain(realm.string_id)
validate_email(f"bot@{realm_host}")
validate_email(Address(username="bot", domain=realm_host).addr_spec)
return realm_host
except ValidationError:
pass
try:
# Check that the fake email domain can be used to form valid email addresses.
validate_email("bot@" + settings.FAKE_EMAIL_DOMAIN)
validate_email(Address(username="bot", domain=settings.FAKE_EMAIL_DOMAIN).addr_spec)
except ValidationError:
raise Exception(
settings.FAKE_EMAIL_DOMAIN + " is not a valid domain. "
@ -61,9 +62,10 @@ def fix_dummy_users(apps: StateApps, schema_editor: BaseDatabaseSchemaEditor) ->
try:
validate_email(user_profile.delivery_email)
except ValidationError:
user_profile.delivery_email = (
f"deleteduser{user_profile.id}@{get_fake_email_domain(user_profile.realm)}"
)
user_profile.delivery_email = Address(
username=f"deleteduser{user_profile.id}",
domain=get_fake_email_domain(user_profile.realm),
).addr_spec
update_fields.append("delivery_email")

View File

@ -3,6 +3,7 @@ import re
import secrets
import time
from datetime import timedelta
from email.headerregistry import Address
from typing import (
TYPE_CHECKING,
AbstractSet,
@ -1043,21 +1044,6 @@ class RealmDomain(models.Model):
unique_together = ("realm", "domain")
# These functions should only be used on email addresses that have
# been validated via django.core.validators.validate_email
#
# Note that we need to use some care, since can you have multiple @-signs; e.g.
# "tabbott@test"@zulip.com
# is valid email address
def email_to_username(email: str) -> str:
return "@".join(email.split("@")[:-1]).lower()
# Returns the raw domain portion of the desired email address
def email_to_domain(email: str) -> str:
return email.split("@")[-1].lower()
class DomainNotAllowedForRealmError(Exception):
pass
@ -2190,7 +2176,7 @@ class GroupGroupMembership(models.Model):
def remote_user_to_email(remote_user: str) -> str:
if settings.SSO_APPEND_DOMAIN is not None:
remote_user += "@" + settings.SSO_APPEND_DOMAIN
return Address(username=remote_user, domain=settings.SSO_APPEND_DOMAIN).addr_spec
return remote_user
@ -4610,14 +4596,14 @@ class InvalidFakeEmailDomain(Exception):
def get_fake_email_domain(realm: Realm) -> str:
try:
# Check that realm.host can be used to form valid email addresses.
validate_email(f"bot@{realm.host}")
validate_email(Address(username="bot", domain=realm.host).addr_spec)
return realm.host
except ValidationError:
pass
try:
# Check that the fake email domain can be used to form valid email addresses.
validate_email("bot@" + settings.FAKE_EMAIL_DOMAIN)
validate_email(Address(username="bot", domain=settings.FAKE_EMAIL_DOMAIN).addr_spec)
except ValidationError:
raise InvalidFakeEmailDomain(
settings.FAKE_EMAIL_DOMAIN + " is not a valid domain. "

View File

@ -15,6 +15,7 @@
import json
import os
import sys
from email.headerregistry import Address
from functools import wraps
from typing import Any, Callable, Dict, List, Set, TypeVar, cast
@ -58,7 +59,10 @@ def ensure_users(ids_list: List[int], user_names: List[str]) -> None:
# Ensure that the list of user ids (ids_list)
# matches the users we want to refer to (user_names).
realm = get_realm("zulip")
user_ids = [get_user(name + "@zulip.com", realm).id for name in user_names]
user_ids = [
get_user(Address(username=name, domain="zulip.com").addr_spec, realm).id
for name in user_names
]
assert ids_list == user_ids

View File

@ -9,6 +9,7 @@ import time
import urllib
from abc import ABC, abstractmethod
from contextlib import contextmanager
from email.headerregistry import Address
from typing import (
TYPE_CHECKING,
Any,
@ -107,7 +108,6 @@ from zerver.models import (
Stream,
UserProfile,
clear_supported_auth_backends_cache,
email_to_username,
get_realm,
get_user_by_delivery_email,
)
@ -161,12 +161,8 @@ if TYPE_CHECKING:
class AuthBackendTest(ZulipTestCase):
def get_username(self, email_to_username: Optional[Callable[[str], str]] = None) -> str:
username = self.example_email("hamlet")
if email_to_username is not None:
username = email_to_username(self.example_email("hamlet"))
return username
def get_email(self) -> str:
return self.example_email("hamlet")
def verify_backend(
self,
@ -266,7 +262,7 @@ class AuthBackendTest(ZulipTestCase):
def test_dummy_backend(self) -> None:
realm = get_realm("zulip")
username = self.get_username()
username = self.get_email()
self.verify_backend(
ZulipDummyBackend(),
good_kwargs=dict(username=username, realm=realm, use_dummy_backend=True),
@ -279,7 +275,7 @@ class AuthBackendTest(ZulipTestCase):
realm.save()
def test_email_auth_backend(self) -> None:
username = self.get_username()
username = self.get_email()
user_profile = self.example_user("hamlet")
password = "testpassword"
user_profile.set_password(password)
@ -447,7 +443,7 @@ class AuthBackendTest(ZulipTestCase):
password = self.ldap_password("hamlet")
self.setup_subdomain(user_profile)
username = self.get_username()
username = self.get_email()
backend = ZulipLDAPAuthBackend()
# Test LDAP auth fails when LDAP server rejects password
@ -479,13 +475,13 @@ class AuthBackendTest(ZulipTestCase):
def test_devauth_backend(self) -> None:
self.verify_backend(
DevAuthBackend(),
good_kwargs=dict(dev_auth_username=self.get_username(), realm=get_realm("zulip")),
bad_kwargs=dict(dev_auth_username=self.get_username(), realm=get_realm("zephyr")),
good_kwargs=dict(dev_auth_username=self.get_email(), realm=get_realm("zulip")),
bad_kwargs=dict(dev_auth_username=self.get_email(), realm=get_realm("zephyr")),
)
@override_settings(AUTHENTICATION_BACKENDS=("zproject.backends.ZulipRemoteUserBackend",))
def test_remote_user_backend(self) -> None:
username = self.get_username()
username = self.get_email()
self.verify_backend(
ZulipRemoteUserBackend(),
good_kwargs=dict(remote_user=username, realm=get_realm("zulip")),
@ -494,7 +490,7 @@ class AuthBackendTest(ZulipTestCase):
@override_settings(AUTHENTICATION_BACKENDS=("zproject.backends.ZulipRemoteUserBackend",))
def test_remote_user_backend_invalid_realm(self) -> None:
username = self.get_username()
username = self.get_email()
self.verify_backend(
ZulipRemoteUserBackend(),
good_kwargs=dict(remote_user=username, realm=get_realm("zulip")),
@ -504,7 +500,7 @@ class AuthBackendTest(ZulipTestCase):
@override_settings(AUTHENTICATION_BACKENDS=("zproject.backends.ZulipRemoteUserBackend",))
@override_settings(SSO_APPEND_DOMAIN="zulip.com")
def test_remote_user_backend_sso_append_domain(self) -> None:
username = self.get_username(email_to_username)
username = Address(addr_spec=self.get_email()).username
self.verify_backend(
ZulipRemoteUserBackend(),
good_kwargs=dict(remote_user=username, realm=get_realm("zulip")),
@ -5277,7 +5273,7 @@ class TestZulipRemoteUserBackend(DesktopFlowTestingLib, ZulipTestCase):
def test_login_mobile_flow_otp_success_username(self) -> None:
user_profile = self.example_user("hamlet")
email = user_profile.delivery_email
remote_user = email_to_username(email)
remote_user = Address(addr_spec=email).username
user_profile.date_joined = timezone_now() - datetime.timedelta(seconds=61)
user_profile.save()
mobile_flow_otp = "1234abcd" * 8
@ -5363,7 +5359,7 @@ class TestZulipRemoteUserBackend(DesktopFlowTestingLib, ZulipTestCase):
def test_login_desktop_flow_otp_success_username(self) -> None:
user_profile = self.example_user("hamlet")
email = user_profile.delivery_email
remote_user = email_to_username(email)
remote_user = Address(addr_spec=email).username
user_profile.date_joined = timezone_now() - datetime.timedelta(seconds=61)
user_profile.save()
desktop_flow_otp = "1234abcd" * 8
@ -5540,10 +5536,10 @@ class DjangoToLDAPUsernameTests(ZulipTestCase):
self.mock_ldap.directory['uid="hamlet@test",ou=users,dc=zulip,dc=com'] = {
"cn": ["King Hamlet"],
"uid": ['"hamlet@test"'],
"uid": ["hamlet@test"],
}
username = self.backend.django_to_ldap_username('"hamlet@test"@zulip.com')
self.assertEqual(username, '"hamlet@test"')
self.assertEqual(username, "hamlet@test")
self.mock_ldap.directory['uid="hamlet@test"@zulip,ou=users,dc=zulip,dc=com'] = {
"cn": ["King Hamlet"],
@ -5857,7 +5853,7 @@ class TestLDAP(ZulipLDAPTestCase):
backend = self.backend
with self.settings(LDAP_APPEND_DOMAIN="zulip.com"):
username = backend.user_email_from_ldapuser(
"this_argument_is_ignored", _LDAPUser(self.backend, username='"hamlet@test"')
"this_argument_is_ignored", _LDAPUser(self.backend, username="hamlet@test")
)
self.assertEqual(username, '"hamlet@test"@zulip.com')

View File

@ -90,7 +90,7 @@ class BotTest(ZulipTestCase, UploadSerializeMixin):
# Invalid username
bot_info = dict(
full_name="My bot name",
short_name="@",
short_name="my\nbot",
)
result = self.client_post("/json/bots", bot_info)
self.assert_json_error(result, "Bad name or username")

View File

@ -3,6 +3,7 @@ import email.policy
import os
import subprocess
from email import message_from_string
from email.headerregistry import Address
from email.message import EmailMessage, MIMEPart
from typing import TYPE_CHECKING, Any, Callable, Dict, Mapping, Optional
from unittest import mock
@ -237,12 +238,11 @@ class TestStreamEmailMessagesSuccess(ZulipTestCase):
def create_incoming_valid_message(
self, msgtext: str, stream: Stream, include_quotes: bool
) -> EmailMessage:
stream_to_address = encode_email_address(stream)
parts = stream_to_address.split("@")
parts[0] += "+show-sender"
address = Address(addr_spec=encode_email_address(stream))
email_username = address.username + "+show-sender"
if include_quotes:
parts[0] += "+include-quotes"
stream_to_address = "@".join(parts)
email_username += "+include-quotes"
stream_to_address = Address(username=email_username, domain=address.domain).addr_spec
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content(msgtext)
@ -459,10 +459,9 @@ and other things
self.subscribe(user_profile, "Denmark")
stream = get_stream("Denmark", user_profile.realm)
stream_to_address = encode_email_address(stream)
parts = stream_to_address.split("@")
parts[0] += "+show-sender"
stream_to_address = "@".join(parts)
address = Address(addr_spec=encode_email_address(stream))
email_username = address.username + "+show-sender"
stream_to_address = Address(username=email_username, domain=address.domain).addr_spec
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content("TestStreamEmailMessages body")
@ -491,10 +490,9 @@ and other things
self.subscribe(user_profile, "Denmark")
stream = get_stream("Denmark", user_profile.realm)
stream_to_address = encode_email_address(stream)
parts = stream_to_address.split("@")
parts[0] += "+include-footer"
stream_to_address = "@".join(parts)
address = Address(addr_spec=encode_email_address(stream))
email_username = address.username + "+include-footer"
stream_to_address = Address(username=email_username, domain=address.domain).addr_spec
text = """Test message
--
@ -520,10 +518,9 @@ and other things
self.subscribe(user_profile, "Denmark")
stream = get_stream("Denmark", user_profile.realm)
stream_to_address = encode_email_address(stream)
parts = stream_to_address.split("@")
parts[0] += "+include-quotes"
stream_to_address = "@".join(parts)
address = Address(addr_spec=encode_email_address(stream))
email_username = address.username + "+include-quotes"
stream_to_address = Address(username=email_username, domain=address.domain).addr_spec
text = """Reply
@ -1603,8 +1600,10 @@ class TestEmailMirrorLogAndReport(ZulipTestCase):
stream = get_stream("Denmark", user_profile.realm)
stream_to_address = encode_email_address(stream)
address_parts = stream_to_address.split("@")
scrubbed_address = "X" * len(address_parts[0]) + "@" + address_parts[1]
address = Address(addr_spec=stream_to_address)
scrubbed_address = Address(
username="X" * len(address.username), domain=address.domain
).addr_spec
incoming_valid_message = EmailMessage()
incoming_valid_message.set_content("Test body")
@ -1666,8 +1665,10 @@ class TestEmailMirrorLogAndReport(ZulipTestCase):
# Test for a stream address:
stream_to_address = encode_email_address(stream)
stream_address_parts = stream_to_address.split("@")
scrubbed_stream_address = "X" * len(stream_address_parts[0]) + "@" + stream_address_parts[1]
address = Address(addr_spec=stream_to_address)
scrubbed_stream_address = Address(
username="X" * len(address.username), domain=address.domain
).addr_spec
error_message = "test message {}"
error_message = error_message.format(stream_to_address)

View File

@ -1,5 +1,6 @@
import datetime
import sys
from email.headerregistry import Address
from typing import TYPE_CHECKING, Any, List, Mapping, Optional, Set
from unittest import mock
@ -2308,7 +2309,7 @@ class TestCrossRealmPMs(ZulipTestCase):
return realm
def create_user(self, email: str) -> UserProfile:
subdomain = email.split("@")[1]
subdomain = Address(addr_spec=email).domain
self.register(email, "test", subdomain=subdomain)
# self.register has the side-effect of ending up with a logged in session
# for the new user. We don't want that in these tests.

View File

@ -1414,12 +1414,16 @@ class ScrubRealmTest(ZulipTestCase):
zulip_users = UserProfile.objects.filter(realm=zulip)
for user in zulip_users:
self.assertTrue(re.search("Scrubbed [a-z0-9]{15}", user.full_name))
self.assertTrue(re.search("scrubbed-[a-z0-9]{15}@" + zulip.host, user.email))
self.assertTrue(re.search("scrubbed-[a-z0-9]{15}@" + zulip.host, user.delivery_email))
self.assertRegex(user.full_name, r"^Scrubbed [a-z0-9]{15}$")
self.assertRegex(user.email, rf"^scrubbed-[a-z0-9]{{15}}@{re.escape(zulip.host)}$")
self.assertRegex(
user.delivery_email, rf"^scrubbed-[a-z0-9]{{15}}@{re.escape(zulip.host)}$"
)
lear_users = UserProfile.objects.filter(realm=lear)
for user in lear_users:
self.assertIsNone(re.search("Scrubbed [a-z0-9]{15}", user.full_name))
self.assertIsNone(re.search("scrubbed-[a-z0-9]{15}@" + zulip.host, user.email))
self.assertIsNone(re.search("scrubbed-[a-z0-9]{15}@" + zulip.host, user.delivery_email))
self.assertNotRegex(user.full_name, r"^Scrubbed [a-z0-9]{15}$")
self.assertNotRegex(user.email, rf"^scrubbed-[a-z0-9]{{15}}@{re.escape(zulip.host)}$")
self.assertNotRegex(
user.delivery_email, rf"^scrubbed-[a-z0-9]{{15}}@{re.escape(zulip.host)}$"
)

View File

@ -1,6 +1,7 @@
import logging
import secrets
import urllib
from email.headerregistry import Address
from functools import wraps
from typing import Any, Dict, List, Mapping, Optional, cast
from urllib.parse import urlencode
@ -482,7 +483,7 @@ def remote_user_jwt(request: HttpRequest) -> HttpResponse:
if email_domain is None:
raise JsonableError(_("No organization specified in JSON web token claims"))
email = f"{remote_user}@{email_domain}"
email = Address(username=remote_user, domain=email_domain).addr_spec
try:
realm = get_realm(subdomain)

View File

@ -1,4 +1,5 @@
import sys
from email.headerregistry import Address
from typing import Iterable, Optional, Sequence, Union, cast
from dateutil.parser import parse as dateparser
@ -32,7 +33,6 @@ from zerver.models import (
Realm,
RealmDomain,
UserProfile,
email_to_domain,
get_user_including_cross_realm,
)
@ -97,7 +97,7 @@ def same_realm_zephyr_user(user_profile: UserProfile, email: str) -> bool:
except ValidationError:
return False
domain = email_to_domain(email)
domain = Address(addr_spec=email).domain
# Assumes allow_subdomains=False for all RealmDomain's corresponding to
# these realms.
@ -116,7 +116,9 @@ def same_realm_irc_user(user_profile: UserProfile, email: str) -> bool:
except ValidationError:
return False
domain = email_to_domain(email).replace("irc.", "")
domain = Address(addr_spec=email).domain
if domain.startswith("irc."):
domain = domain[len("irc.") :]
# Assumes allow_subdomains=False for all RealmDomain's corresponding to
# these realms.
@ -131,7 +133,7 @@ def same_realm_jabber_user(user_profile: UserProfile, email: str) -> bool:
# If your Jabber users have a different email domain than the
# Zulip users, this is where you would do any translation.
domain = email_to_domain(email)
domain = Address(addr_spec=email).domain
# Assumes allow_subdomains=False for all RealmDomain's corresponding to
# these realms.

View File

@ -1,3 +1,4 @@
from email.headerregistry import Address
from typing import Any, Dict, List, Optional, Union
from django.conf import settings
@ -441,7 +442,7 @@ def add_bot_backend(
short_name += "-bot"
full_name = check_full_name(full_name_raw)
try:
email = f"{short_name}@{user_profile.realm.get_bot_domain()}"
email = Address(username=short_name, domain=user_profile.realm.get_bot_domain()).addr_spec
except InvalidFakeEmailDomain:
raise JsonableError(
_(
@ -449,6 +450,8 @@ def add_bot_backend(
"Please contact your server administrator."
)
)
except ValueError:
raise JsonableError(_("Bad name or username"))
form = CreateUserForm({"full_name": full_name, "email": email})
if bot_type == UserProfile.EMBEDDED_BOT:
@ -457,8 +460,10 @@ def add_bot_backend(
if service_name not in [bot.name for bot in EMBEDDED_BOTS]:
raise JsonableError(_("Invalid embedded bot name."))
if not form.is_valid():
# We validate client-side as well
if not form.is_valid(): # nocoverage
# coverage note: The similar block above covers the most
# common situation where this might fail, but this case may be
# still possible with an overly long username.
raise JsonableError(_("Bad name or username"))
try:
get_user_by_delivery_email(email, user_profile.realm)

View File

@ -3,6 +3,7 @@ import logging
import re
import shlex
import subprocess
from email.headerregistry import Address
from typing import Optional
import orjson
@ -42,7 +43,7 @@ def webathena_kerberos_login(
user = parsed_cred["cname"]["nameString"][0]
if user in kerberos_alter_egos:
user = kerberos_alter_egos[user]
assert user == user_profile.email.split("@")[0]
assert user == Address(addr_spec=user_profile.email).username
# Limit characters in usernames to valid MIT usernames
# This is important for security since DNS is not secure.
assert re.match(r"^[a-z0-9_.-]+$", user) is not None

View File

@ -1,3 +1,4 @@
from email.headerregistry import Address
from typing import Any, Dict
from django.http import HttpRequest, HttpResponse
@ -93,7 +94,7 @@ def build_pagerduty_formatdict(message: Dict[str, Any]) -> Dict[str, Any]:
if message["data"]["incident"].get("assigned_to_user", None):
assigned_to_user = message["data"]["incident"]["assigned_to_user"]
format_dict["assignee_info"] = AGENT_TEMPLATE.format(
username=assigned_to_user["email"].split("@")[0],
username=Address(addr_spec=assigned_to_user["email"]).username,
url=assigned_to_user["html_url"],
)
else:
@ -102,7 +103,7 @@ def build_pagerduty_formatdict(message: Dict[str, Any]) -> Dict[str, Any]:
if message["data"]["incident"].get("resolved_by_user", None):
resolved_by_user = message["data"]["incident"]["resolved_by_user"]
format_dict["agent_info"] = AGENT_TEMPLATE.format(
username=resolved_by_user["email"].split("@")[0],
username=Address(addr_spec=resolved_by_user["email"]).username,
url=resolved_by_user["html_url"],
)

View File

@ -16,6 +16,7 @@ import binascii
import json
import logging
from abc import ABC, abstractmethod
from email.headerregistry import Address
from typing import (
Any,
Callable,
@ -99,7 +100,6 @@ from zerver.models import (
Realm,
UserProfile,
custom_profile_fields_for_realm,
email_to_username,
get_realm,
get_user_by_delivery_email,
get_user_profile_by_id,
@ -533,7 +533,7 @@ def email_belongs_to_ldap(realm: Realm, email: str) -> bool:
check_ldap_config()
if settings.LDAP_APPEND_DOMAIN:
# Check if the email ends with LDAP_APPEND_DOMAIN
return email.strip().lower().endswith("@" + settings.LDAP_APPEND_DOMAIN)
return Address(addr_spec=email).domain.lower() == settings.LDAP_APPEND_DOMAIN
# If we don't have an LDAP domain, we have to do a lookup for the email.
if find_ldap_users_by_email(email):
@ -652,11 +652,12 @@ class ZulipLDAPAuthBackendBase(ZulipAuthMixin, LDAPBackend):
result = username
if settings.LDAP_APPEND_DOMAIN:
if is_valid_email(username):
if not username.endswith("@" + settings.LDAP_APPEND_DOMAIN):
address = Address(addr_spec=username)
if address.domain != settings.LDAP_APPEND_DOMAIN:
raise ZulipLDAPExceptionOutsideDomain(
f"Email {username} does not match LDAP domain {settings.LDAP_APPEND_DOMAIN}."
)
result = email_to_username(username)
result = address.username
else:
# We can use find_ldap_users_by_email
if is_valid_email(username):
@ -691,7 +692,7 @@ class ZulipLDAPAuthBackendBase(ZulipAuthMixin, LDAPBackend):
username = ldap_user._username
if settings.LDAP_APPEND_DOMAIN:
return "@".join((username, settings.LDAP_APPEND_DOMAIN))
return Address(username=username, domain=settings.LDAP_APPEND_DOMAIN).addr_spec
if settings.LDAP_EMAIL_ATTR is not None:
# Get email from LDAP attributes.

View File

@ -1,4 +1,5 @@
import os
from email.headerregistry import Address
from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypedDict
from scripts.lib.zulip_tools import deport
@ -27,9 +28,11 @@ EXTERNAL_HOST_WITHOUT_PORT = deport(EXTERNAL_HOST)
ALLOWED_HOSTS: List[str] = []
# Basic email settings
NOREPLY_EMAIL_ADDRESS = "noreply@" + EXTERNAL_HOST_WITHOUT_PORT
NOREPLY_EMAIL_ADDRESS = Address(username="noreply", domain=EXTERNAL_HOST_WITHOUT_PORT).addr_spec
ADD_TOKENS_TO_NOREPLY_ADDRESS = True
TOKENIZED_NOREPLY_EMAIL_ADDRESS = "noreply-{token}@" + EXTERNAL_HOST_WITHOUT_PORT
TOKENIZED_NOREPLY_EMAIL_ADDRESS = Address(
username="noreply-{token}", domain=EXTERNAL_HOST_WITHOUT_PORT
).addr_spec
PHYSICAL_ADDRESS = ""
FAKE_EMAIL_DOMAIN = EXTERNAL_HOST_WITHOUT_PORT