email_mirror: Ignore stream_name part of receiving address.

To prepare for changing how the stream name gets encoded into mirror
email addresses while making sure old addresses keep working, we ignore
the stream_name part when receiving emails into the mirror and we only
look at the email_token to identify into which stream to mirror the
email.
This commit is contained in:
Mateusz Mandera 2019-03-17 10:36:16 +01:00 committed by Tim Abbott
parent 898798a352
commit a64a075ff1
3 changed files with 38 additions and 27 deletions

View File

@ -4480,9 +4480,9 @@ def get_email_gateway_message_string_from_address(address: str) -> Optional[str]
return msg_string
def decode_email_address(email: str) -> Optional[Tuple[str, str, bool]]:
def decode_email_address(email: str) -> Optional[Tuple[str, bool]]:
# Perform the reverse of encode_email_address. Returns a tuple of
# (streamname, email_token, show_sender)
# (email_token, show_sender)
msg_string = get_email_gateway_message_string_from_address(email)
if msg_string is None:
return None
@ -4500,8 +4500,7 @@ def decode_email_address(email: str) -> Optional[Tuple[str, str, bool]]:
else:
encoded_stream_name, token = msg_string.split('+')
stream_name = re.sub(r"%\d{4}", lambda x: chr(int(x.group(0)[1:])), encoded_stream_name)
return stream_name, token, show_sender
return token, show_sender
SubHelperT = Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]

View File

@ -208,13 +208,6 @@ def send_zulip(sender: str, stream: Stream, topic: str, content: str) -> None:
truncate_body(content),
email_gateway=True)
def valid_stream(stream_name: str, token: str) -> bool:
try:
stream = Stream.objects.get(email_token=token)
return stream.name.lower() == stream_name.lower()
except Stream.DoesNotExist:
return False
def get_message_part_by_type(message: message.Message, content_type: str) -> Optional[str]:
charsets = message.get_charsets()
@ -298,12 +291,14 @@ def extract_and_validate(email: str) -> Tuple[Stream, bool]:
temp = decode_email_address(email)
if temp is None:
raise ZulipEmailForwardError("Malformed email recipient " + email)
stream_name, token, show_sender = temp
token, show_sender = temp
if not valid_stream(stream_name, token):
try:
stream = Stream.objects.get(email_token=token)
except Stream.DoesNotExist:
raise ZulipEmailForwardError("Bad stream token from email recipient " + email)
return Stream.objects.get(email_token=token), show_sender
return stream, show_sender
def find_emailgateway_recipient(message: message.Message) -> str:
# We can't use Delivered-To; if there is a X-Gm-Original-To

View File

@ -57,9 +57,8 @@ class TestEncodeDecode(ZulipTestCase):
self.assertTrue(email_address.endswith('@testserver'))
tup = decode_email_address(email_address)
assert tup is not None
(decoded_stream_name, token, show_sender) = tup
token, show_sender = tup
self.assertFalse(show_sender)
self.assertEqual(decoded_stream_name, stream_name)
self.assertEqual(token, stream.email_token)
parts = email_address.split('@')
@ -67,25 +66,22 @@ class TestEncodeDecode(ZulipTestCase):
email_address_show = '@'.join(parts)
tup = decode_email_address(email_address_show)
assert tup is not None
(decoded_stream_name, token, show_sender) = tup
token, show_sender = tup
self.assertTrue(show_sender)
self.assertEqual(decoded_stream_name, stream_name)
self.assertEqual(token, stream.email_token)
email_address_dots = email_address.replace('+', '.')
tup = decode_email_address(email_address_dots)
assert tup is not None
(decoded_stream_name, token, show_sender) = tup
token, show_sender = tup
self.assertFalse(show_sender)
self.assertEqual(decoded_stream_name, stream_name)
self.assertEqual(token, stream.email_token)
email_address_dots_show = email_address_show.replace('+', '.')
tup = decode_email_address(email_address_dots_show)
assert tup is not None
(decoded_stream_name, token, show_sender) = tup
token, show_sender = tup
self.assertTrue(show_sender)
self.assertEqual(decoded_stream_name, stream_name)
self.assertEqual(token, stream.email_token)
email_address = email_address.replace('@testserver', '@zulip.org')
@ -96,20 +92,29 @@ class TestEncodeDecode(ZulipTestCase):
with self.settings(EMAIL_GATEWAY_EXTRA_PATTERN_HACK='@zulip.org'):
tup = decode_email_address(email_address)
assert tup is not None
(decoded_stream_name, token, show_sender) = tup
token, show_sender = tup
self.assertFalse(show_sender)
self.assertEqual(decoded_stream_name, stream_name)
self.assertEqual(token, stream.email_token)
tup = decode_email_address(email_address_show)
assert tup is not None
(decoded_stream_name, token, show_sender) = tup
token, show_sender = tup
self.assertTrue(show_sender)
self.assertEqual(decoded_stream_name, stream_name)
self.assertEqual(token, stream.email_token)
self.assertEqual(decode_email_address('bogus'), None)
def test_decode_ignores_stream_name(self) -> None:
stream = get_stream("Denmark", get_realm("zulip"))
stream_to_address = encode_email_address(stream)
stream_to_address = stream_to_address.replace("Denmark", "Some_name")
# get the email_token:
tup = decode_email_address(stream_to_address)
assert tup is not None
token = tup[0]
self.assertEqual(token, stream.email_token)
class TestEmailMirrorLibrary(ZulipTestCase):
def test_get_missed_message_token(self) -> None:
@ -643,7 +648,11 @@ class TestEmailMirrorTornadoView(ZulipTestCase):
def test_error_to_stream_with_wrong_address(self) -> None:
stream = get_stream("Denmark", get_realm("zulip"))
stream_to_address = encode_email_address(stream)
stream_to_address = stream_to_address.replace("Denmark", "Wrong_stream")
# get the email_token:
tup = decode_email_address(stream_to_address)
assert tup is not None
token = tup[0]
stream_to_address = stream_to_address.replace(token, "Wrong_token")
result = self.send_offline_message(stream_to_address, self.example_email('hamlet'))
self.assert_json_error(
@ -651,6 +660,14 @@ class TestEmailMirrorTornadoView(ZulipTestCase):
"5.1.1 Bad destination mailbox address: "
"Please use the address specified in your Streams page.")
def test_success_to_stream_with_good_token_wrong_stream_name(self) -> None:
stream = get_stream("Denmark", get_realm("zulip"))
stream_to_address = encode_email_address(stream)
stream_to_address = stream_to_address.replace("Denmark", "Wrong_name")
result = self.send_offline_message(stream_to_address, self.example_email('hamlet'))
self.assert_json_success(result)
def test_success_to_private(self) -> None:
mm_address = self.send_private_message()
result = self.send_offline_message(mm_address, self.example_email('cordelia'))