# -*- coding: utf-8 -*- from __future__ import absolute_import from django.conf import settings from django.core.exceptions import ValidationError from django.http import HttpResponse from django.test import TestCase from zerver.forms import email_is_not_mit_mailing_list from zerver.lib.rate_limiter import ( add_ratelimit_rule, clear_user_history, remove_ratelimit_rule, ) from zerver.lib.actions import compute_mit_user_fullname from zerver.lib.test_classes import ( ZulipTestCase, ) from zerver.models import get_user_profile_by_email from zerver.lib.test_runner import slow import DNS import mock import time import ujson from six.moves import urllib from six.moves import range from six import text_type class MITNameTest(TestCase): def test_valid_hesiod(self): # type: () -> None with mock.patch('DNS.dnslookup', return_value=[['starnine:*:84233:101:Athena Consulting Exchange User,,,:/mit/starnine:/bin/bash']]): self.assertEquals(compute_mit_user_fullname("starnine@mit.edu"), "Athena Consulting Exchange User") with mock.patch('DNS.dnslookup', return_value=[['sipbexch:*:87824:101:Exch Sipb,,,:/mit/sipbexch:/bin/athena/bash']]): self.assertEquals(compute_mit_user_fullname("sipbexch@mit.edu"), "Exch Sipb") def test_invalid_hesiod(self): # type: () -> None with mock.patch('DNS.dnslookup', side_effect=DNS.Base.ServerError('DNS query status: NXDOMAIN', 3)): self.assertEquals(compute_mit_user_fullname("1234567890@mit.edu"), "1234567890@mit.edu") with mock.patch('DNS.dnslookup', side_effect=DNS.Base.ServerError('DNS query status: NXDOMAIN', 3)): self.assertEquals(compute_mit_user_fullname("ec-discuss@mit.edu"), "ec-discuss@mit.edu") def test_mailinglist(self): # type: () -> None with mock.patch('DNS.dnslookup', side_effect=DNS.Base.ServerError('DNS query status: NXDOMAIN', 3)): self.assertRaises(ValidationError, email_is_not_mit_mailing_list, "1234567890@mit.edu") with mock.patch('DNS.dnslookup', side_effect=DNS.Base.ServerError('DNS query status: NXDOMAIN', 3)): self.assertRaises(ValidationError, email_is_not_mit_mailing_list, "ec-discuss@mit.edu") def test_notmailinglist(self): # type: () -> None with mock.patch('DNS.dnslookup', return_value=[['POP IMAP.EXCHANGE.MIT.EDU starnine']]): email_is_not_mit_mailing_list("sipbexch@mit.edu") class RateLimitTests(ZulipTestCase): def setUp(self): # type: () -> None settings.RATE_LIMITING = True add_ratelimit_rule(1, 5) def tearDown(self): # type: () -> None settings.RATE_LIMITING = False remove_ratelimit_rule(1, 5) def send_api_message(self, email, content): # type: (text_type, text_type) -> HttpResponse return self.client_post("/api/v1/messages", {"type": "stream", "to": "Verona", "client": "test suite", "content": content, "subject": "Test subject"}, **self.api_auth(email)) def test_headers(self): # type: () -> None email = "hamlet@zulip.com" user = get_user_profile_by_email(email) clear_user_history(user) result = self.send_api_message(email, "some stuff") self.assertTrue('X-RateLimit-Remaining' in result) self.assertTrue('X-RateLimit-Limit' in result) self.assertTrue('X-RateLimit-Reset' in result) def test_ratelimit_decrease(self): # type: () -> None email = "hamlet@zulip.com" user = get_user_profile_by_email(email) clear_user_history(user) result = self.send_api_message(email, "some stuff") limit = int(result['X-RateLimit-Remaining']) result = self.send_api_message(email, "some stuff 2") newlimit = int(result['X-RateLimit-Remaining']) self.assertEqual(limit, newlimit + 1) @slow('has to sleep to work') def test_hit_ratelimits(self): # type: () -> None email = "cordelia@zulip.com" user = get_user_profile_by_email(email) clear_user_history(user) for i in range(6): result = self.send_api_message(email, "some stuff %s" % (i,)) self.assertEqual(result.status_code, 429) json = ujson.loads(result.content) self.assertEqual(json.get("result"), "error") self.assertIn("API usage exceeded rate limit, try again in", json.get("msg")) self.assertTrue('Retry-After' in result) self.assertIn(result['Retry-After'], json.get("msg")) # We actually wait a second here, rather than force-clearing our history, # to make sure the rate-limiting code automatically forgives a user # after some time has passed. time.sleep(1) result = self.send_api_message(email, "Good message") self.assert_json_success(result)