Annotate zerver/tests/tests.py.

This commit is contained in:
Tim Abbott 2016-06-15 14:47:22 -07:00
parent 45883386ce
commit 6723525fd3
2 changed files with 130 additions and 20 deletions

View File

@ -19,7 +19,6 @@ zproject/settings.py
zproject/test_settings.py
zerver/migrations/
zerver/tests/test_bugdown.py
zerver/tests/tests.py
zerver/tests/test_email_mirror.py
zerver/tests/test_decorators.py
zerver/tests/test_upload.py

View File

@ -2,6 +2,9 @@
from __future__ import absolute_import
from __future__ import print_function
from typing import Any, Callable, Dict, Iterable, List, Tuple, TypeVar
from django.http import HttpResponse
from django.test import TestCase
from zerver.lib.test_helpers import (
@ -33,6 +36,7 @@ from zerver.worker import queue_processors
from django.conf import settings
from django.core import mail
from six import text_type
import datetime
import os
import re
@ -41,6 +45,7 @@ import time
import ujson
def bail(msg):
# type: (str) -> None
print('\nERROR: %s\n' % (msg,))
sys.exit(1)
@ -56,7 +61,10 @@ try:
except ImportError:
bail('The Pygments library is required to run the backend test suite.')
K = TypeVar('K')
V = TypeVar('V')
def find_dict(lst, k, v):
# type: (Iterable[Dict[K, V]], K, V) -> Dict[K, V]
for dct in lst:
if dct[k] == v:
return dct
@ -64,6 +72,7 @@ def find_dict(lst, k, v):
class SlowQueryTest(TestCase):
def test_is_slow_query(self):
# type: () -> None
self.assertFalse(is_slow_query(1.1, '/some/random/url'))
self.assertTrue(is_slow_query(2, '/some/random/url'))
self.assertTrue(is_slow_query(5.1, '/activity'))
@ -77,13 +86,15 @@ class SlowQueryTest(TestCase):
class RealmTest(AuthedTestCase):
def assert_user_profile_cache_gets_new_name(self, email, new_realm_name):
# type: (text_type, text_type) -> None
user_profile = get_user_profile_by_email(email)
self.assertEqual(user_profile.realm.name, new_realm_name)
def test_do_set_realm_name_caching(self):
# The main complicated thing about setting realm names is fighting the
# cache, and we start by populating the cache for Hamlet, and we end
# by checking the cache to ensure that the new value is there.
# type: () -> None
"""The main complicated thing about setting realm names is fighting the
cache, and we start by populating the cache for Hamlet, and we end
by checking the cache to ensure that the new value is there."""
get_user_profile_by_email('hamlet@zulip.com')
realm = get_realm('zulip.com')
new_name = 'Zed You Elle Eye Pea'
@ -92,9 +103,10 @@ class RealmTest(AuthedTestCase):
self.assert_user_profile_cache_gets_new_name('hamlet@zulip.com', new_name)
def test_do_set_realm_name_events(self):
# type: () -> None
realm = get_realm('zulip.com')
new_name = 'Puliz'
events = []
events = [] # type: List[Dict[str, Any]]
with tornado_redirected_to_list(events):
do_set_realm_name(realm, new_name)
event = events[0]['event']
@ -106,6 +118,7 @@ class RealmTest(AuthedTestCase):
))
def test_realm_name_api(self):
# type: () -> None
new_name = 'Zulip: Worldwide Exporter of APIs'
email = 'cordelia@zulip.com'
@ -120,6 +133,7 @@ class RealmTest(AuthedTestCase):
self.assertEqual(realm.name, new_name)
def test_admin_restrictions_for_changing_realm_name(self):
# type: () -> None
new_name = 'Mice will play while the cat is away'
email = 'othello@zulip.com'
@ -132,10 +146,12 @@ class RealmTest(AuthedTestCase):
self.assert_json_error(result, 'Must be a realm administrator')
def test_do_deactivate_realm(self):
# The main complicated thing about deactivating realm names is updating the
# cache, and we start by populating the cache for Hamlet, and we end
# by checking the cache to ensure that his realm appears to be deactivated.
# You can make this test fail by disabling cache.flush_realm().
# type: () -> None
"""The main complicated thing about deactivating realm names is
updating the cache, and we start by populating the cache for
Hamlet, and we end by checking the cache to ensure that his
realm appears to be deactivated. You can make this test fail
by disabling cache.flush_realm()."""
get_user_profile_by_email('hamlet@zulip.com')
realm = get_realm('zulip.com')
do_deactivate_realm(realm)
@ -144,6 +160,7 @@ class RealmTest(AuthedTestCase):
class PermissionTest(AuthedTestCase):
def test_get_admin_users(self):
# type: () -> None
user_profile = get_user_profile_by_email('hamlet@zulip.com')
do_change_is_admin(user_profile, False)
admin_users = user_profile.realm.get_admin_users()
@ -153,6 +170,7 @@ class PermissionTest(AuthedTestCase):
self.assertTrue(user_profile in admin_users)
def test_admin_api(self):
# type: () -> None
self.login('hamlet@zulip.com')
admin = get_user_profile_by_email('hamlet@zulip.com')
user = get_user_profile_by_email('othello@zulip.com')
@ -171,7 +189,7 @@ class PermissionTest(AuthedTestCase):
# Giveth
req = dict(is_admin=ujson.dumps(True))
events = []
events = [] # type: List[Dict[str, Any]]
with tornado_redirected_to_list(events):
result = self.client_patch('/json/users/othello@zulip.com', req)
self.assert_json_success(result)
@ -201,19 +219,23 @@ class PermissionTest(AuthedTestCase):
class WorkerTest(TestCase):
class FakeClient(object):
def __init__(self):
self.consumers = {}
self.queue = []
# type: () -> None
self.consumers = {} # type: Dict[str, Callable]
self.queue = [] # type: List[Tuple[str, Dict[str, Any]]]
def register_json_consumer(self, queue_name, callback):
# type: (str, Callable) -> None
self.consumers[queue_name] = callback
def start_consuming(self):
# type: () -> None
for queue_name, data in self.queue:
callback = self.consumers[queue_name]
callback(data)
def test_UserActivityWorker(self):
# type: () -> None
fake_client = self.FakeClient()
user = get_user_profile_by_email('hamlet@zulip.com')
@ -242,16 +264,20 @@ class WorkerTest(TestCase):
self.assertTrue(activity_records[0].count, 1)
def test_error_handling(self):
# type: () -> None
processed = []
@queue_processors.assign_queue('unreliable_worker')
class UnreliableWorker(queue_processors.QueueProcessingWorker):
def consume(self, data):
# type: (str) -> None
if data == 'unexpected behaviour':
raise Exception('Worker task not performing as expected!')
processed.append(data)
def _log_problem(self):
# type: () -> None
# keep the tests quiet
pass
@ -276,18 +302,23 @@ class WorkerTest(TestCase):
self.assertEqual(event, 'unexpected behaviour')
def test_worker_noname(self):
# type: () -> None
class TestWorker(queue_processors.QueueProcessingWorker):
def __init__(self):
# type: () -> None
super(TestWorker, self).__init__()
def consume(self, data):
# type: (str) -> None
pass
with self.assertRaises(queue_processors.WorkerDeclarationException):
TestWorker()
def test_worker_noconsume(self):
# type: () -> None
@queue_processors.assign_queue('test_worker')
class TestWorker(queue_processors.QueueProcessingWorker):
def __init__(self):
# type: () -> None
super(TestWorker, self).__init__()
with self.assertRaises(queue_processors.WorkerDeclarationException):
@ -296,6 +327,7 @@ class WorkerTest(TestCase):
class ActivityTest(AuthedTestCase):
def test_activity(self):
# type: () -> None
self.login("hamlet@zulip.com")
client, _ = Client.objects.get_or_create(name='website')
query = '/json/users/me/pointer'
@ -316,6 +348,7 @@ class ActivityTest(AuthedTestCase):
class UserProfileTest(TestCase):
def test_get_emails_from_user_ids(self):
# type: () -> None
hamlet = get_user_profile_by_email('hamlet@zulip.com')
othello = get_user_profile_by_email('othello@zulip.com')
dct = get_emails_from_user_ids([hamlet.id, othello.id])
@ -324,6 +357,7 @@ class UserProfileTest(TestCase):
class UserChangesTest(AuthedTestCase):
def test_update_api_key(self):
# type: () -> None
email = "hamlet@zulip.com"
self.login(email)
user = get_user_profile_by_email(email)
@ -337,6 +371,7 @@ class UserChangesTest(AuthedTestCase):
class ActivateTest(AuthedTestCase):
def test_basics(self):
# type: () -> None
user = get_user_profile_by_email('hamlet@zulip.com')
do_deactivate_user(user)
self.assertFalse(user.is_active)
@ -344,6 +379,7 @@ class ActivateTest(AuthedTestCase):
self.assertTrue(user.is_active)
def test_api(self):
# type: () -> None
admin = get_user_profile_by_email('othello@zulip.com')
do_change_is_admin(admin, True)
self.login('othello@zulip.com')
@ -367,12 +403,14 @@ class ActivateTest(AuthedTestCase):
class BotTest(AuthedTestCase):
def assert_num_bots_equal(self, count):
# type: (int) -> None
result = self.client.get("/json/bots")
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(count, len(json['bots']))
def create_bot(self, **extras):
# type: (**Any) -> Dict[str, Any]
bot_info = {
'full_name': 'The Bot of Hamlet',
'short_name': 'hambot',
@ -383,13 +421,15 @@ class BotTest(AuthedTestCase):
return ujson.loads(result.content)
def deactivate_bot(self):
# type: () -> None
result = self.client_delete("/json/bots/hambot-bot@zulip.com")
self.assert_json_success(result)
def test_add_bot(self):
# type: () -> None
self.login("hamlet@zulip.com")
self.assert_num_bots_equal(0)
events = []
events = [] # type: List[Dict[str, Any]]
with tornado_redirected_to_list(events):
result = self.create_bot()
self.assert_num_bots_equal(1)
@ -413,6 +453,7 @@ class BotTest(AuthedTestCase):
)
def test_add_bot_with_default_sending_stream(self):
# type: () -> None
self.login("hamlet@zulip.com")
self.assert_num_bots_equal(0)
result = self.create_bot(default_sending_stream='Denmark')
@ -423,6 +464,7 @@ class BotTest(AuthedTestCase):
self.assertEqual(profile.default_sending_stream.name, 'Denmark')
def test_add_bot_with_default_sending_stream_not_subscribed(self):
# type: () -> None
self.login("hamlet@zulip.com")
self.assert_num_bots_equal(0)
result = self.create_bot(default_sending_stream='Rome')
@ -433,6 +475,7 @@ class BotTest(AuthedTestCase):
self.assertEqual(profile.default_sending_stream.name, 'Rome')
def test_add_bot_with_default_sending_stream_private_allowed(self):
# type: () -> None
self.login("hamlet@zulip.com")
user_profile = get_user_profile_by_email("hamlet@zulip.com")
stream = get_stream("Denmark", user_profile.realm)
@ -440,7 +483,7 @@ class BotTest(AuthedTestCase):
do_make_stream_private(user_profile.realm, "Denmark")
self.assert_num_bots_equal(0)
events = []
events = [] # type: List[Dict[str, Any]]
with tornado_redirected_to_list(events):
result = self.create_bot(default_sending_stream='Denmark')
self.assert_num_bots_equal(1)
@ -469,6 +512,7 @@ class BotTest(AuthedTestCase):
self.assertEqual(event['users'], (user_profile.id,))
def test_add_bot_with_default_sending_stream_private_denied(self):
# type: () -> None
self.login("hamlet@zulip.com")
user_profile = get_user_profile_by_email("hamlet@zulip.com")
stream = get_stream("Denmark", user_profile.realm)
@ -484,6 +528,7 @@ class BotTest(AuthedTestCase):
self.assert_json_error(result, 'Insufficient permission')
def test_add_bot_with_default_events_register_stream(self):
# type: () -> None
self.login("hamlet@zulip.com")
self.assert_num_bots_equal(0)
result = self.create_bot(default_events_register_stream='Denmark')
@ -494,6 +539,7 @@ class BotTest(AuthedTestCase):
self.assertEqual(profile.default_events_register_stream.name, 'Denmark')
def test_add_bot_with_default_events_register_stream_private_allowed(self):
# type: () -> None
self.login("hamlet@zulip.com")
user_profile = get_user_profile_by_email("hamlet@zulip.com")
stream = get_stream("Denmark", user_profile.realm)
@ -501,7 +547,7 @@ class BotTest(AuthedTestCase):
do_make_stream_private(user_profile.realm, "Denmark")
self.assert_num_bots_equal(0)
events = []
events = [] # type: List[Dict[str, Any]]
with tornado_redirected_to_list(events):
result = self.create_bot(default_events_register_stream='Denmark')
self.assert_num_bots_equal(1)
@ -530,6 +576,7 @@ class BotTest(AuthedTestCase):
self.assertEqual(event['users'], (user_profile.id,))
def test_add_bot_with_default_events_register_stream_private_denied(self):
# type: () -> None
self.login("hamlet@zulip.com")
user_profile = get_user_profile_by_email("hamlet@zulip.com")
stream = get_stream("Denmark", user_profile.realm)
@ -546,6 +593,7 @@ class BotTest(AuthedTestCase):
self.assert_json_error(result, 'Insufficient permission')
def test_add_bot_with_default_all_public_streams(self):
# type: () -> None
self.login("hamlet@zulip.com")
self.assert_num_bots_equal(0)
result = self.create_bot(default_all_public_streams=ujson.dumps(True))
@ -556,6 +604,7 @@ class BotTest(AuthedTestCase):
self.assertEqual(profile.default_all_public_streams, True)
def test_deactivate_bot(self):
# type: () -> None
self.login("hamlet@zulip.com")
self.assert_num_bots_equal(0)
self.create_bot()
@ -566,7 +615,8 @@ class BotTest(AuthedTestCase):
self.assert_num_bots_equal(0)
def test_deactivate_bogus_bot(self):
# Deleting a bogus bot will succeed silently.
# type: () -> None
"""Deleting a bogus bot will succeed silently."""
self.login("hamlet@zulip.com")
self.assert_num_bots_equal(0)
self.create_bot()
@ -576,7 +626,8 @@ class BotTest(AuthedTestCase):
self.assert_num_bots_equal(1)
def test_bot_deactivation_attacks(self):
# You cannot deactivate somebody else's bot.
# type: () -> None
"""You cannot deactivate somebody else's bot."""
self.login("hamlet@zulip.com")
self.assert_num_bots_equal(0)
self.create_bot()
@ -603,6 +654,7 @@ class BotTest(AuthedTestCase):
self.assert_num_bots_equal(1)
def test_bot_permissions(self):
# type: () -> None
self.login("hamlet@zulip.com")
self.assert_num_bots_equal(0)
self.create_bot()
@ -621,11 +673,13 @@ class BotTest(AuthedTestCase):
self.assert_json_error(result, 'Insufficient permission')
def get_bot(self):
# type: () -> Dict[str, Any]
result = self.client.get("/json/bots")
bots = ujson.loads(result.content)['bots']
return bots[0]
def test_update_api_key(self):
# type: () -> None
self.login("hamlet@zulip.com")
self.create_bot()
bot = self.get_bot()
@ -638,6 +692,7 @@ class BotTest(AuthedTestCase):
self.assertEqual(new_api_key, bot['api_key'])
def test_patch_bot_full_name(self):
# type: () -> None
self.login("hamlet@zulip.com")
bot_info = {
'full_name': 'The Bot of Hamlet',
@ -658,6 +713,7 @@ class BotTest(AuthedTestCase):
self.assertEqual('Fred', bot['full_name'])
def test_patch_bot_to_stream(self):
# type: () -> None
self.login("hamlet@zulip.com")
bot_info = {
'full_name': 'The Bot of Hamlet',
@ -678,6 +734,7 @@ class BotTest(AuthedTestCase):
self.assertEqual('Denmark', bot['default_sending_stream'])
def test_patch_bot_to_stream_not_subscribed(self):
# type: () -> None
self.login("hamlet@zulip.com")
bot_info = {
'full_name': 'The Bot of Hamlet',
@ -698,6 +755,7 @@ class BotTest(AuthedTestCase):
self.assertEqual('Rome', bot['default_sending_stream'])
def test_patch_bot_to_stream_none(self):
# type: () -> None
self.login("hamlet@zulip.com")
bot_info = {
'full_name': 'The Bot of Hamlet',
@ -718,6 +776,7 @@ class BotTest(AuthedTestCase):
self.assertEqual(None, bot['default_sending_stream'])
def test_patch_bot_to_stream_private_allowed(self):
# type: () -> None
self.login("hamlet@zulip.com")
user_profile = get_user_profile_by_email("hamlet@zulip.com")
stream = get_stream("Denmark", user_profile.realm)
@ -744,6 +803,7 @@ class BotTest(AuthedTestCase):
self.assertEqual('Denmark', bot['default_sending_stream'])
def test_patch_bot_to_stream_private_denied(self):
# type: () -> None
self.login("hamlet@zulip.com")
user_profile = get_user_profile_by_email("hamlet@zulip.com")
stream = get_stream("Denmark", user_profile.realm)
@ -764,6 +824,7 @@ class BotTest(AuthedTestCase):
self.assert_json_error(result, 'Insufficient permission')
def test_patch_bot_to_stream_not_found(self):
# type: () -> None
self.login("hamlet@zulip.com")
bot_info = {
'full_name': 'The Bot of Hamlet',
@ -778,6 +839,7 @@ class BotTest(AuthedTestCase):
self.assert_json_error(result, 'No such stream \'missing\'')
def test_patch_bot_events_register_stream(self):
# type: () -> None
self.login("hamlet@zulip.com")
bot_info = {
'full_name': 'The Bot of Hamlet',
@ -798,6 +860,7 @@ class BotTest(AuthedTestCase):
self.assertEqual('Denmark', bot['default_events_register_stream'])
def test_patch_bot_events_register_stream_allowed(self):
# type: () -> None
self.login("hamlet@zulip.com")
user_profile = get_user_profile_by_email("hamlet@zulip.com")
stream = get_stream("Denmark", user_profile.realm)
@ -823,6 +886,7 @@ class BotTest(AuthedTestCase):
self.assertEqual('Denmark', bot['default_events_register_stream'])
def test_patch_bot_events_register_stream_denied(self):
# type: () -> None
self.login("hamlet@zulip.com")
user_profile = get_user_profile_by_email("hamlet@zulip.com")
stream = get_stream("Denmark", user_profile.realm)
@ -842,6 +906,7 @@ class BotTest(AuthedTestCase):
self.assert_json_error(result, 'Insufficient permission')
def test_patch_bot_events_register_stream_none(self):
# type: () -> None
self.login("hamlet@zulip.com")
bot_info = {
'full_name': 'The Bot of Hamlet',
@ -862,6 +927,7 @@ class BotTest(AuthedTestCase):
self.assertEqual(None, bot['default_events_register_stream'])
def test_patch_bot_events_register_stream_not_found(self):
# type: () -> None
self.login("hamlet@zulip.com")
bot_info = {
'full_name': 'The Bot of Hamlet',
@ -876,6 +942,7 @@ class BotTest(AuthedTestCase):
self.assert_json_error(result, 'No such stream \'missing\'')
def test_patch_bot_default_all_public_streams_true(self):
# type: () -> None
self.login("hamlet@zulip.com")
bot_info = {
'full_name': 'The Bot of Hamlet',
@ -896,6 +963,7 @@ class BotTest(AuthedTestCase):
self.assertEqual(bot['default_all_public_streams'], True)
def test_patch_bot_default_all_public_streams_false(self):
# type: () -> None
self.login("hamlet@zulip.com")
bot_info = {
'full_name': 'The Bot of Hamlet',
@ -916,6 +984,7 @@ class BotTest(AuthedTestCase):
self.assertEqual(bot['default_all_public_streams'], False)
def test_patch_bot_via_post(self):
# type: () -> None
self.login("hamlet@zulip.com")
bot_info = {
'full_name': 'The Bot of Hamlet',
@ -937,7 +1006,8 @@ class BotTest(AuthedTestCase):
self.assertEqual('Fred', bot['full_name'])
def test_patch_bogus_bot(self):
# Deleting a bogus bot will succeed silently.
# type: () -> None
"""Deleting a bogus bot will succeed silently."""
self.login("hamlet@zulip.com")
self.create_bot()
bot_info = {
@ -950,6 +1020,7 @@ class BotTest(AuthedTestCase):
class ChangeSettingsTest(AuthedTestCase):
def post_with_params(self, modified_params):
# type: (Dict[str, text_type]) -> HttpResponse
post_params = {"full_name": "Foo Bar",
"old_password": initial_password("hamlet@zulip.com"),
"new_password": "foobar1", "confirm_password": "foobar1",
@ -958,9 +1029,11 @@ class ChangeSettingsTest(AuthedTestCase):
return self.client.post("/json/settings/change", dict(post_params))
def check_well_formed_change_settings_response(self, result):
# type: (Dict[str, Any]) -> None
self.assertIn("full_name", result)
def check_for_toggle_param(self, pattern, param):
# type: (str, str) -> None
self.login("hamlet@zulip.com")
user_profile = get_user_profile_by_email("hamlet@zulip.com")
json_result = self.client.post(pattern,
@ -978,6 +1051,7 @@ class ChangeSettingsTest(AuthedTestCase):
self.assertEqual(getattr(user_profile, param), False)
def test_successful_change_settings(self):
# type: () -> None
"""
A call to /json/settings/change with valid parameters changes the user's
settings correctly and returns correct values.
@ -996,6 +1070,7 @@ class ChangeSettingsTest(AuthedTestCase):
# This is basically a don't-explode test.
def test_notify_settings(self):
# type: () -> None
self.check_for_toggle_param("/json/notify_settings/change", "enable_desktop_notifications")
self.check_for_toggle_param("/json/notify_settings/change", "enable_stream_desktop_notifications")
self.check_for_toggle_param("/json/notify_settings/change", "enable_stream_sounds")
@ -1005,19 +1080,24 @@ class ChangeSettingsTest(AuthedTestCase):
self.check_for_toggle_param("/json/notify_settings/change", "enable_digest_emails")
def test_ui_settings(self):
# type: () -> None
self.check_for_toggle_param("/json/ui_settings/change", "autoscroll_forever")
self.check_for_toggle_param("/json/ui_settings/change", "default_desktop_notifications")
def test_toggling_left_side_userlist(self):
# type: () -> None
self.check_for_toggle_param("/json/left_side_userlist", "left_side_userlist")
def test_time_setting(self):
# type: () -> None
self.check_for_toggle_param("/json/time_setting", "twenty_four_hour_time")
def test_enter_sends_setting(self):
# type: () -> None
self.check_for_toggle_param('/json/users/me/enter-sends', "enter_sends")
def test_missing_params(self):
# type: () -> None
"""
full_name is a required POST parameter for json_change_settings.
(enable_desktop_notifications is false by default, and password is
@ -1030,6 +1110,7 @@ class ChangeSettingsTest(AuthedTestCase):
"Missing '%s' argument" % ("full_name",))
def test_mismatching_passwords(self):
# type: () -> None
"""
new_password and confirm_password must match
"""
@ -1039,6 +1120,7 @@ class ChangeSettingsTest(AuthedTestCase):
"New password must match confirmation password!")
def test_wrong_old_password(self):
# type: () -> None
"""
new_password and confirm_password must match
"""
@ -1049,11 +1131,13 @@ class ChangeSettingsTest(AuthedTestCase):
class GetProfileTest(AuthedTestCase):
def common_update_pointer(self, email, pointer):
# type: (text_type, int) -> None
self.login(email)
result = self.client_put("/json/users/me/pointer", {"pointer": pointer})
self.assert_json_success(result)
def common_get_profile(self, email):
# type: (str) -> Dict[text_type, Any]
user_profile = get_user_profile_by_email(email)
self.send_message(email, "Verona", Recipient.STREAM, "hello")
@ -1072,6 +1156,7 @@ class GetProfileTest(AuthedTestCase):
return json
def test_cache_behavior(self):
# type: () -> None
with queries_captured() as queries:
with simulated_empty_cache() as cache_queries:
user_profile = get_user_profile_by_email('hamlet@zulip.com')
@ -1081,6 +1166,7 @@ class GetProfileTest(AuthedTestCase):
self.assertEqual(user_profile.email, 'hamlet@zulip.com')
def test_api_get_empty_profile(self):
# type: () -> None
"""
Ensure GET /users/me returns a max message id and returns successfully
"""
@ -1088,6 +1174,7 @@ class GetProfileTest(AuthedTestCase):
self.assertEqual(json["pointer"], -1)
def test_profile_with_pointer(self):
# type: () -> None
"""
Ensure GET /users/me returns a proper pointer id after the pointer is updated
"""
@ -1109,6 +1196,7 @@ class GetProfileTest(AuthedTestCase):
self.assert_json_error(result, "Invalid message ID")
def test_get_all_profiles_avatar_urls(self):
# type: () -> None
user_profile = get_user_profile_by_email('hamlet@zulip.com')
result = self.client.get("/api/v1/users", **self.api_auth('hamlet@zulip.com'))
self.assert_json_success(result)
@ -1123,6 +1211,7 @@ class GetProfileTest(AuthedTestCase):
class UserPresenceTests(AuthedTestCase):
def test_get_empty(self):
# type: () -> None
self.login("hamlet@zulip.com")
result = self.client.post("/json/get_active_statuses")
@ -1132,11 +1221,13 @@ class UserPresenceTests(AuthedTestCase):
self.assertEqual(presence, {})
def test_set_idle(self):
# type: () -> None
email = "hamlet@zulip.com"
self.login(email)
client = 'website'
def test_result(result):
# type: (HttpResponse) -> datetime.datetime
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(json['presences'][email][client]['status'], 'idle')
@ -1164,6 +1255,7 @@ class UserPresenceTests(AuthedTestCase):
self.assertGreaterEqual(newer_timestamp, timestamp)
def test_set_active(self):
# type: () -> None
self.login("hamlet@zulip.com")
client = 'website'
@ -1191,7 +1283,8 @@ class UserPresenceTests(AuthedTestCase):
self.assertEqual(json['presences']['hamlet@zulip.com'][client]['status'], 'idle')
def test_no_mit(self):
# MIT never gets a list of users
# type: () -> None
"""MIT never gets a list of users"""
self.login("espuser@mit.edu")
result = self.client.post("/json/users/me/presence", {'status': 'idle'})
self.assert_json_success(result)
@ -1199,6 +1292,7 @@ class UserPresenceTests(AuthedTestCase):
self.assertEqual(json['presences'], {})
def test_same_realm(self):
# type: () -> None
self.login("espuser@mit.edu")
self.client.post("/json/users/me/presence", {'status': 'idle'})
result = self.client.post("/accounts/logout/")
@ -1217,6 +1311,7 @@ class AlertWordTests(AuthedTestCase):
interesting_alert_word_list = ['alert', 'multi-word word', u'']
def test_internal_endpoint(self):
# type: () -> None
email = "cordelia@zulip.com"
self.login(email)
@ -1231,6 +1326,7 @@ class AlertWordTests(AuthedTestCase):
def test_default_no_words(self):
# type: () -> None
"""
Users start out with no alert words.
"""
@ -1242,6 +1338,7 @@ class AlertWordTests(AuthedTestCase):
self.assertEqual(words, [])
def test_add_word(self):
# type: () -> None
"""
add_user_alert_words can add multiple alert words at once.
"""
@ -1255,6 +1352,7 @@ class AlertWordTests(AuthedTestCase):
self.assertEqual(words, self.interesting_alert_word_list)
def test_remove_word(self):
# type: () -> None
"""
Removing alert words works via remove_user_alert_words, even
for multi-word and non-ascii words.
@ -1274,6 +1372,7 @@ class AlertWordTests(AuthedTestCase):
theoretical_remaining_alerts)
def test_realm_words(self):
# type: () -> None
"""
We can gather alert words for an entire realm via
alert_words_in_realm. Alerts added for one user do not impact other
@ -1296,6 +1395,7 @@ class AlertWordTests(AuthedTestCase):
self.assertEqual(realm_words[user2.id], ['another'])
def test_json_list_default(self):
# type: () -> None
self.login("hamlet@zulip.com")
result = self.client.get('/json/users/me/alert_words')
@ -1305,6 +1405,7 @@ class AlertWordTests(AuthedTestCase):
self.assertEqual(data['alert_words'], [])
def test_json_list_add(self):
# type: () -> None
self.login("hamlet@zulip.com")
result = self.client_put('/json/users/me/alert_words', {'alert_words': ujson.dumps(['one', 'two', 'three'])})
@ -1317,6 +1418,7 @@ class AlertWordTests(AuthedTestCase):
self.assertEqual(data['alert_words'], ['one', 'two', 'three'])
def test_json_list_remove(self):
# type: () -> None
self.login("hamlet@zulip.com")
result = self.client_put('/json/users/me/alert_words', {'alert_words': ujson.dumps(['one', 'two', 'three'])})
@ -1331,6 +1433,7 @@ class AlertWordTests(AuthedTestCase):
self.assertEqual(data['alert_words'], ['two', 'three'])
def test_json_list_set(self):
# type: () -> None
self.login("hamlet@zulip.com")
result = self.client_put('/json/users/me/alert_words', {'alert_words': ujson.dumps(['one', 'two', 'three'])})
@ -1345,12 +1448,14 @@ class AlertWordTests(AuthedTestCase):
self.assertEqual(data['alert_words'], ['a', 'b', 'c'])
def message_does_alert(self, user_profile, message):
# Send a bunch of messages as othello, so Hamlet is notified
# type: (UserProfile, text_type) -> bool
"""Send a bunch of messages as othello, so Hamlet is notified"""
self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, message)
user_message = most_recent_usermessage(user_profile)
return 'has_alert_word' in user_message.flags_list()
def test_alert_flags(self):
# type: () -> None
self.login("hamlet@zulip.com")
user_profile_hamlet = get_user_profile_by_email("hamlet@zulip.com")
@ -1383,6 +1488,7 @@ class AlertWordTests(AuthedTestCase):
class MutedTopicsTests(AuthedTestCase):
def test_json_set(self):
# type: () -> None
email = 'hamlet@zulip.com'
self.login(email)
@ -1404,6 +1510,8 @@ class MutedTopicsTests(AuthedTestCase):
class ExtractedRecipientsTest(TestCase):
def test_extract_recipients(self):
# type: () -> None
# JSON list w/dups, empties, and trailing whitespace
s = ujson.dumps([' alice@zulip.com ', ' bob@zulip.com ', ' ', 'bob@zulip.com'])
self.assertItemsEqual(extract_recipients(s), ['alice@zulip.com', 'bob@zulip.com'])
@ -1427,6 +1535,7 @@ class ExtractedRecipientsTest(TestCase):
class TestMissedMessages(AuthedTestCase):
def test_extra_context_in_missed_stream_messages(self):
# type: () -> None
self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, '0')
self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, '1')
self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, '2')
@ -1445,6 +1554,7 @@ class TestMissedMessages(AuthedTestCase):
handle_missedmessage_emails(hamlet.id, [{'message_id': msg_id}])
def normalize_string(s):
# type: (text_type) -> text_type
s = s.strip()
return re.sub(r'\s+', ' ', s)
@ -1456,6 +1566,7 @@ class TestMissedMessages(AuthedTestCase):
class TestOpenRealms(AuthedTestCase):
def test_open_realm_logic(self):
# type: () -> None
mit_realm = get_realm("mit.edu")
self.assertEquals(get_unique_open_realm(), None)
mit_realm.restricted_to_domain = False