diff --git a/zerver/lib/email_mirror.py b/zerver/lib/email_mirror.py index 4ef0a18c47..87a29de059 100644 --- a/zerver/lib/email_mirror.py +++ b/zerver/lib/email_mirror.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Optional, Tuple +from typing import Any, Dict, Optional, Tuple, List import logging import re @@ -17,8 +17,11 @@ from zerver.lib.email_notifications import convert_html_to_markdown from zerver.lib.queue import queue_json_publish from zerver.lib.redis_utils import get_redis_client from zerver.lib.upload import upload_message_file -from zerver.lib.utils import generate_random_token +from zerver.lib.utils import generate_random_token, statsd from zerver.lib.send_email import FromAddress +from zerver.lib.rate_limiter import RateLimitedObject, RateLimiterLockingException, \ + is_ratelimited, incr_ratelimit +from zerver.lib.exceptions import RateLimited from zerver.models import Stream, Recipient, \ get_user_profile_by_id, get_display_recipient, get_personal_recipient, \ Message, Realm, UserProfile, get_system_bot, get_user, get_stream_by_id_in_realm @@ -401,3 +404,33 @@ def mirror_email_message(data: Dict[str, str]) -> Dict[str, str]: } ) return {"status": "success"} + +# Email mirror rate limiter code: + +class RateLimitedRealmMirror(RateLimitedObject): + def __init__(self, realm: Realm) -> None: + self.realm = realm + + def key_fragment(self) -> str: + return "emailmirror:{}:{}".format(type(self.realm), self.realm.id) + + def rules(self) -> List[Tuple[int, int]]: + return settings.RATE_LIMITING_MIRROR_REALM_RULES + + +def rate_limit_mirror_by_realm(recipient_realm: Realm) -> None: + # Code based on the rate_limit_user function: + entity = RateLimitedRealmMirror(recipient_realm) + ratelimited, time = is_ratelimited(entity) + + if ratelimited: + statsd.incr("ratelimiter.limited.%s.%s" % (type(recipient_realm), + recipient_realm.id)) + raise RateLimited() + + try: + incr_ratelimit(entity) + except RateLimiterLockingException: + logger.warning("Email mirror rate limiter: Deadlock trying to " + "incr_ratelimit for realm %s" % (recipient_realm.name,)) + raise RateLimited() diff --git a/zerver/tests/test_queue_worker.py b/zerver/tests/test_queue_worker.py index 4273acaa7b..430b09d640 100644 --- a/zerver/tests/test_queue_worker.py +++ b/zerver/tests/test_queue_worker.py @@ -10,6 +10,8 @@ from mock import patch, MagicMock from typing import Any, Callable, Dict, List, Mapping, Tuple from zerver.lib.actions import encode_email_address +from zerver.lib.email_mirror import RateLimitedRealmMirror +from zerver.lib.rate_limiter import RateLimiterLockingException, clear_history from zerver.lib.send_email import FromAddress from zerver.lib.test_helpers import simulated_queue_client from zerver.lib.test_classes import ZulipTestCase @@ -235,6 +237,64 @@ class WorkerTest(ZulipTestCase): self.assertEqual(mock_mirror_email.call_count, 3) + @patch('zerver.worker.queue_processors.mirror_email') + @override_settings(RATE_LIMITING_MIRROR_REALM_RULES=[(10, 2)]) + def test_mirror_worker_rate_limiting(self, mock_mirror_email: MagicMock) -> None: + fake_client = self.FakeClient() + realm = get_realm('zulip') + clear_history(RateLimitedRealmMirror(realm)) + stream = get_stream('Denmark', realm) + stream_to_address = encode_email_address(stream) + data = [ + dict( + message=u'\xf3test', + time=time.time(), + rcpt_to=stream_to_address + ) + ] * 5 + for element in data: + fake_client.queue.append(('email_mirror', element)) + + with simulated_queue_client(lambda: fake_client): + start_time = time.time() + with patch('time.time', return_value=start_time): + worker = queue_processors.MirrorWorker() + worker.setup() + worker.start() + # Of the first 5 messages, only 2 should be processed + # (the rest being rate-limited): + self.assertEqual(mock_mirror_email.call_count, 2) + + # If a new message is sent into the stream mirror, it will get rejected: + fake_client.queue.append(('email_mirror', data[0])) + worker.start() + self.assertEqual(mock_mirror_email.call_count, 2) + + # However, missed message emails don't get rate limited: + with self.settings(EMAIL_GATEWAY_PATTERN="%s@example.com"): + address = 'mm' + ('x' * 32) + '@example.com' + event = dict( + message=u'\xf3test', + time=time.time(), + rcpt_to=address + ) + fake_client.queue.append(('email_mirror', event)) + worker.start() + self.assertEqual(mock_mirror_email.call_count, 3) + + # After some times passes, emails get accepted again: + with patch('time.time', return_value=(start_time + 11.0)): + fake_client.queue.append(('email_mirror', data[0])) + worker.start() + self.assertEqual(mock_mirror_email.call_count, 4) + + # If RateLimiterLockingException is thrown, we rate-limit the new message: + with patch('zerver.lib.email_mirror.incr_ratelimit', + side_effect=RateLimiterLockingException): + fake_client.queue.append(('email_mirror', data[0])) + worker.start() + self.assertEqual(mock_mirror_email.call_count, 4) + def test_email_sending_worker_retries(self) -> None: """Tests the retry_send_email_failures decorator to make sure it retries sending the email 3 times and then gives up.""" diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 2dc7498931..3160529e54 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -33,7 +33,8 @@ from zerver.lib.url_preview import preview as url_preview from zerver.lib.digest import handle_digest_email from zerver.lib.send_email import send_future_email, send_email_from_dict, \ FromAddress, EmailNotDeliveredException, handle_send_email_format_changes -from zerver.lib.email_mirror import process_message as mirror_email +from zerver.lib.email_mirror import process_message as mirror_email, rate_limit_mirror_by_realm, \ + is_missed_message_address, extract_and_validate from zerver.lib.streams import access_stream_by_id from zerver.tornado.socket import req_redis_key, respond_send_message from confirmation.models import Confirmation, create_confirmation_link @@ -44,6 +45,7 @@ from zerver.lib.outgoing_webhook import do_rest_call, get_outgoing_webhook_servi from zerver.models import get_bot_services from zulip_bots.lib import extract_query_without_mention from zerver.lib.bot_lib import EmbeddedBotHandler, get_bot_handler, EmbeddedBotQuitException +from zerver.lib.exceptions import RateLimited import os import sys @@ -480,8 +482,22 @@ class DigestWorker(QueueProcessingWorker): # nocoverage @assign_queue('email_mirror') class MirrorWorker(QueueProcessingWorker): def consume(self, event: Mapping[str, Any]) -> None: + rcpt_to = event['rcpt_to'] + if not is_missed_message_address(rcpt_to): + # Missed message addresses are one-time use, so we don't need + # to worry about emails to them resulting in message spam. + recipient_realm = extract_and_validate(rcpt_to)[0].realm + try: + rate_limit_mirror_by_realm(recipient_realm) + except RateLimited: + msg = email.message_from_string(event["message"]) + logger.warning("MirrorWorker: Rejecting an email from: %s " + "to realm: %s - rate limited." + % (msg['From'], recipient_realm.name)) + return + mirror_email(email.message_from_string(event["message"]), - rcpt_to=event["rcpt_to"], pre_checked=True) + rcpt_to=rcpt_to, pre_checked=True) @assign_queue('test', queue_type="test") class TestWorker(QueueProcessingWorker): diff --git a/zproject/settings.py b/zproject/settings.py index c1a61053b3..558037e852 100644 --- a/zproject/settings.py +++ b/zproject/settings.py @@ -687,6 +687,13 @@ CACHES = { RATE_LIMITING_RULES = [ (60, 200), # 200 requests max every minute ] + +RATE_LIMITING_MIRROR_REALM_RULES = [ + (60, 50), # 50 emails per minute + (300, 120), # 120 emails per 5 minutes + (3600, 600), # 600 emails per hour +] + DEBUG_RATE_LIMITING = DEBUG REDIS_PASSWORD = get_secret('redis_password')