email_mirror: Add realm-based rate limiting.

Closes #2420

We add rate limiting (max X emails withing Y seconds per realm) to the
email mirror. By creating RateLimitedRealmMirror class, inheriting from
RateLimitedObject, and rate_limit_mirror_by_realm function, following a
mechanism used by rate_limit_user, we're able to have this
implementation mostly rely on the already existing, and proven over
time, rate_limiter.py code. The rules are configurable in settings.py in
RATE_LIMITING_MIRROR_REALM_RULES, analogically to RATE_LIMITING_RULES.

Rate limit verification happens in the MirrorWorker in
queue_processors.py. We don't rate limit missed message emails, as due
to using one time addresses, they're not a spam threat.

test_mirror_worker is adapted to the altered MirrorWorker code and a new
test - test_mirror_worker_rate_limiting is added in test_queue_worker.py
to provide coverage for these changes.
This commit is contained in:
Mateusz Mandera 2019-03-16 11:39:09 +01:00 committed by Tim Abbott
parent 386813f42b
commit 1901775383
4 changed files with 120 additions and 4 deletions

View File

@ -1,4 +1,4 @@
from typing import Any, Dict, Optional, Tuple
from typing import Any, Dict, Optional, Tuple, List
import logging
import re
@ -17,8 +17,11 @@ from zerver.lib.email_notifications import convert_html_to_markdown
from zerver.lib.queue import queue_json_publish
from zerver.lib.redis_utils import get_redis_client
from zerver.lib.upload import upload_message_file
from zerver.lib.utils import generate_random_token
from zerver.lib.utils import generate_random_token, statsd
from zerver.lib.send_email import FromAddress
from zerver.lib.rate_limiter import RateLimitedObject, RateLimiterLockingException, \
is_ratelimited, incr_ratelimit
from zerver.lib.exceptions import RateLimited
from zerver.models import Stream, Recipient, \
get_user_profile_by_id, get_display_recipient, get_personal_recipient, \
Message, Realm, UserProfile, get_system_bot, get_user, get_stream_by_id_in_realm
@ -401,3 +404,33 @@ def mirror_email_message(data: Dict[str, str]) -> Dict[str, str]:
}
)
return {"status": "success"}
# Email mirror rate limiter code:
class RateLimitedRealmMirror(RateLimitedObject):
def __init__(self, realm: Realm) -> None:
self.realm = realm
def key_fragment(self) -> str:
return "emailmirror:{}:{}".format(type(self.realm), self.realm.id)
def rules(self) -> List[Tuple[int, int]]:
return settings.RATE_LIMITING_MIRROR_REALM_RULES
def rate_limit_mirror_by_realm(recipient_realm: Realm) -> None:
# Code based on the rate_limit_user function:
entity = RateLimitedRealmMirror(recipient_realm)
ratelimited, time = is_ratelimited(entity)
if ratelimited:
statsd.incr("ratelimiter.limited.%s.%s" % (type(recipient_realm),
recipient_realm.id))
raise RateLimited()
try:
incr_ratelimit(entity)
except RateLimiterLockingException:
logger.warning("Email mirror rate limiter: Deadlock trying to "
"incr_ratelimit for realm %s" % (recipient_realm.name,))
raise RateLimited()

View File

@ -10,6 +10,8 @@ from mock import patch, MagicMock
from typing import Any, Callable, Dict, List, Mapping, Tuple
from zerver.lib.actions import encode_email_address
from zerver.lib.email_mirror import RateLimitedRealmMirror
from zerver.lib.rate_limiter import RateLimiterLockingException, clear_history
from zerver.lib.send_email import FromAddress
from zerver.lib.test_helpers import simulated_queue_client
from zerver.lib.test_classes import ZulipTestCase
@ -235,6 +237,64 @@ class WorkerTest(ZulipTestCase):
self.assertEqual(mock_mirror_email.call_count, 3)
@patch('zerver.worker.queue_processors.mirror_email')
@override_settings(RATE_LIMITING_MIRROR_REALM_RULES=[(10, 2)])
def test_mirror_worker_rate_limiting(self, mock_mirror_email: MagicMock) -> None:
fake_client = self.FakeClient()
realm = get_realm('zulip')
clear_history(RateLimitedRealmMirror(realm))
stream = get_stream('Denmark', realm)
stream_to_address = encode_email_address(stream)
data = [
dict(
message=u'\xf3test',
time=time.time(),
rcpt_to=stream_to_address
)
] * 5
for element in data:
fake_client.queue.append(('email_mirror', element))
with simulated_queue_client(lambda: fake_client):
start_time = time.time()
with patch('time.time', return_value=start_time):
worker = queue_processors.MirrorWorker()
worker.setup()
worker.start()
# Of the first 5 messages, only 2 should be processed
# (the rest being rate-limited):
self.assertEqual(mock_mirror_email.call_count, 2)
# If a new message is sent into the stream mirror, it will get rejected:
fake_client.queue.append(('email_mirror', data[0]))
worker.start()
self.assertEqual(mock_mirror_email.call_count, 2)
# However, missed message emails don't get rate limited:
with self.settings(EMAIL_GATEWAY_PATTERN="%s@example.com"):
address = 'mm' + ('x' * 32) + '@example.com'
event = dict(
message=u'\xf3test',
time=time.time(),
rcpt_to=address
)
fake_client.queue.append(('email_mirror', event))
worker.start()
self.assertEqual(mock_mirror_email.call_count, 3)
# After some times passes, emails get accepted again:
with patch('time.time', return_value=(start_time + 11.0)):
fake_client.queue.append(('email_mirror', data[0]))
worker.start()
self.assertEqual(mock_mirror_email.call_count, 4)
# If RateLimiterLockingException is thrown, we rate-limit the new message:
with patch('zerver.lib.email_mirror.incr_ratelimit',
side_effect=RateLimiterLockingException):
fake_client.queue.append(('email_mirror', data[0]))
worker.start()
self.assertEqual(mock_mirror_email.call_count, 4)
def test_email_sending_worker_retries(self) -> None:
"""Tests the retry_send_email_failures decorator to make sure it
retries sending the email 3 times and then gives up."""

View File

@ -33,7 +33,8 @@ from zerver.lib.url_preview import preview as url_preview
from zerver.lib.digest import handle_digest_email
from zerver.lib.send_email import send_future_email, send_email_from_dict, \
FromAddress, EmailNotDeliveredException, handle_send_email_format_changes
from zerver.lib.email_mirror import process_message as mirror_email
from zerver.lib.email_mirror import process_message as mirror_email, rate_limit_mirror_by_realm, \
is_missed_message_address, extract_and_validate
from zerver.lib.streams import access_stream_by_id
from zerver.tornado.socket import req_redis_key, respond_send_message
from confirmation.models import Confirmation, create_confirmation_link
@ -44,6 +45,7 @@ from zerver.lib.outgoing_webhook import do_rest_call, get_outgoing_webhook_servi
from zerver.models import get_bot_services
from zulip_bots.lib import extract_query_without_mention
from zerver.lib.bot_lib import EmbeddedBotHandler, get_bot_handler, EmbeddedBotQuitException
from zerver.lib.exceptions import RateLimited
import os
import sys
@ -480,8 +482,22 @@ class DigestWorker(QueueProcessingWorker): # nocoverage
@assign_queue('email_mirror')
class MirrorWorker(QueueProcessingWorker):
def consume(self, event: Mapping[str, Any]) -> None:
rcpt_to = event['rcpt_to']
if not is_missed_message_address(rcpt_to):
# Missed message addresses are one-time use, so we don't need
# to worry about emails to them resulting in message spam.
recipient_realm = extract_and_validate(rcpt_to)[0].realm
try:
rate_limit_mirror_by_realm(recipient_realm)
except RateLimited:
msg = email.message_from_string(event["message"])
logger.warning("MirrorWorker: Rejecting an email from: %s "
"to realm: %s - rate limited."
% (msg['From'], recipient_realm.name))
return
mirror_email(email.message_from_string(event["message"]),
rcpt_to=event["rcpt_to"], pre_checked=True)
rcpt_to=rcpt_to, pre_checked=True)
@assign_queue('test', queue_type="test")
class TestWorker(QueueProcessingWorker):

View File

@ -687,6 +687,13 @@ CACHES = {
RATE_LIMITING_RULES = [
(60, 200), # 200 requests max every minute
]
RATE_LIMITING_MIRROR_REALM_RULES = [
(60, 50), # 50 emails per minute
(300, 120), # 120 emails per 5 minutes
(3600, 600), # 600 emails per hour
]
DEBUG_RATE_LIMITING = DEBUG
REDIS_PASSWORD = get_secret('redis_password')