# -*- coding: utf-8 -*- from __future__ import absolute_import from __future__ import print_function from django.test import TestCase from zerver.lib.test_helpers import ( queries_captured, simulated_empty_cache, simulated_queue_client, tornado_redirected_to_list, AuthedTestCase, most_recent_usermessage, most_recent_message, ) from zerver.models import UserProfile, Recipient, \ Realm, Client, UserActivity, \ get_user_profile_by_email, split_email_to_domain, get_realm, \ get_client, get_stream, Message, get_unique_open_realm, \ completely_open from zerver.lib.avatar import get_avatar_url from zerver.lib.initial_password import initial_password from zerver.lib.actions import \ get_emails_from_user_ids, do_deactivate_user, do_reactivate_user, \ do_change_is_admin, extract_recipients, \ do_set_realm_name, do_deactivate_realm, \ do_add_subscription, do_remove_subscription, do_make_stream_private from zerver.lib.alert_words import alert_words_in_realm, user_alert_words, \ add_user_alert_words, remove_user_alert_words from zerver.lib.notifications import handle_missedmessage_emails from zerver.lib.session_user import get_session_dict_user from zerver.middleware import is_slow_query from zerver.worker import queue_processors from django.conf import settings from django.core import mail import datetime import os import re import sys import time import ujson def bail(msg): print('\nERROR: %s\n' % (msg,)) sys.exit(1) try: settings.TEST_SUITE except: bail('Test suite only runs correctly with --settings=zproject.test_settings') # Even though we don't use pygments directly in this file, we need # this import. try: import pygments except ImportError: bail('The Pygments library is required to run the backend test suite.') def find_dict(lst, k, v): for dct in lst: if dct[k] == v: return dct raise Exception('Cannot find element in list where key %s == %s' % (k, v)) class SlowQueryTest(TestCase): def test_is_slow_query(self): self.assertFalse(is_slow_query(1.1, '/some/random/url')) self.assertTrue(is_slow_query(2, '/some/random/url')) self.assertTrue(is_slow_query(5.1, '/activity')) self.assertFalse(is_slow_query(2, '/activity')) self.assertFalse(is_slow_query(2, '/json/report_error')) self.assertFalse(is_slow_query(2, '/api/v1/deployments/report_error')) self.assertFalse(is_slow_query(2, '/realm_activity/whatever')) self.assertFalse(is_slow_query(2, '/user_activity/whatever')) self.assertFalse(is_slow_query(9, '/accounts/webathena_kerberos_login/')) self.assertTrue(is_slow_query(11, '/accounts/webathena_kerberos_login/')) class RealmTest(AuthedTestCase): def assert_user_profile_cache_gets_new_name(self, email, new_realm_name): user_profile = get_user_profile_by_email(email) self.assertEqual(user_profile.realm.name, new_realm_name) def test_do_set_realm_name_caching(self): # The main complicated thing about setting realm names is fighting the # cache, and we start by populating the cache for Hamlet, and we end # by checking the cache to ensure that the new value is there. get_user_profile_by_email('hamlet@zulip.com') realm = get_realm('zulip.com') new_name = 'Zed You Elle Eye Pea' do_set_realm_name(realm, new_name) self.assertEqual(get_realm(realm.domain).name, new_name) self.assert_user_profile_cache_gets_new_name('hamlet@zulip.com', new_name) def test_do_set_realm_name_events(self): realm = get_realm('zulip.com') new_name = 'Puliz' events = [] with tornado_redirected_to_list(events): do_set_realm_name(realm, new_name) event = events[0]['event'] self.assertEqual(event, dict( type = 'realm', op = 'update', property = 'name', value = new_name, )) def test_realm_name_api(self): new_name = 'Zulip: Worldwide Exporter of APIs' email = 'cordelia@zulip.com' self.login(email) user_profile = get_user_profile_by_email(email) do_change_is_admin(user_profile, True) req = dict(name=ujson.dumps(new_name)) result = self.client_patch('/json/realm', req) self.assert_json_success(result) realm = get_realm('zulip.com') self.assertEqual(realm.name, new_name) def test_admin_restrictions_for_changing_realm_name(self): new_name = 'Mice will play while the cat is away' email = 'othello@zulip.com' self.login(email) user_profile = get_user_profile_by_email(email) do_change_is_admin(user_profile, False) req = dict(name=ujson.dumps(new_name)) result = self.client_patch('/json/realm', req) self.assert_json_error(result, 'Must be a realm administrator') def test_do_deactivate_realm(self): # The main complicated thing about deactivating realm names is updating the # cache, and we start by populating the cache for Hamlet, and we end # by checking the cache to ensure that his realm appears to be deactivated. # You can make this test fail by disabling cache.flush_realm(). get_user_profile_by_email('hamlet@zulip.com') realm = get_realm('zulip.com') do_deactivate_realm(realm) user = get_user_profile_by_email('hamlet@zulip.com') self.assertTrue(user.realm.deactivated) class PermissionTest(AuthedTestCase): def test_get_admin_users(self): user_profile = get_user_profile_by_email('hamlet@zulip.com') do_change_is_admin(user_profile, False) admin_users = user_profile.realm.get_admin_users() self.assertFalse(user_profile in admin_users) do_change_is_admin(user_profile, True) admin_users = user_profile.realm.get_admin_users() self.assertTrue(user_profile in admin_users) def test_admin_api(self): self.login('hamlet@zulip.com') admin = get_user_profile_by_email('hamlet@zulip.com') user = get_user_profile_by_email('othello@zulip.com') realm = admin.realm do_change_is_admin(admin, True) # Make sure we see is_admin flag in /json/users result = self.client.get('/json/users') self.assert_json_success(result) members = ujson.loads(result.content)['members'] hamlet = find_dict(members, 'email', 'hamlet@zulip.com') self.assertTrue(hamlet['is_admin']) othello = find_dict(members, 'email', 'othello@zulip.com') self.assertFalse(othello['is_admin']) # Giveth req = dict(is_admin=ujson.dumps(True)) events = [] with tornado_redirected_to_list(events): result = self.client_patch('/json/users/othello@zulip.com', req) self.assert_json_success(result) admin_users = realm.get_admin_users() self.assertTrue(user in admin_users) person = events[0]['event']['person'] self.assertEqual(person['email'], 'othello@zulip.com') self.assertEqual(person['is_admin'], True) # Taketh away req = dict(is_admin=ujson.dumps(False)) events = [] with tornado_redirected_to_list(events): result = self.client_patch('/json/users/othello@zulip.com', req) self.assert_json_success(result) admin_users = realm.get_admin_users() self.assertFalse(user in admin_users) person = events[0]['event']['person'] self.assertEqual(person['email'], 'othello@zulip.com') self.assertEqual(person['is_admin'], False) # Make sure only admins can patch other user's info. self.login('othello@zulip.com') result = self.client_patch('/json/users/hamlet@zulip.com', req) self.assert_json_error(result, 'Insufficient permission') class WorkerTest(TestCase): class FakeClient(object): def __init__(self): self.consumers = {} self.queue = [] def register_json_consumer(self, queue_name, callback): self.consumers[queue_name] = callback def start_consuming(self): for queue_name, data in self.queue: callback = self.consumers[queue_name] callback(data) def test_UserActivityWorker(self): fake_client = self.FakeClient() user = get_user_profile_by_email('hamlet@zulip.com') UserActivity.objects.filter( user_profile = user.id, client = get_client('ios') ).delete() data = dict( user_profile_id = user.id, client = 'ios', time = time.time(), query = 'send_message' ) fake_client.queue.append(('user_activity', data)) with simulated_queue_client(lambda: fake_client): worker = queue_processors.UserActivityWorker() worker.setup() worker.start() activity_records = UserActivity.objects.filter( user_profile = user.id, client = get_client('ios') ) self.assertTrue(len(activity_records), 1) self.assertTrue(activity_records[0].count, 1) def test_error_handling(self): processed = [] @queue_processors.assign_queue('unreliable_worker') class UnreliableWorker(queue_processors.QueueProcessingWorker): def consume(self, data): if data == 'unexpected behaviour': raise Exception('Worker task not performing as expected!') processed.append(data) def _log_problem(self): # keep the tests quiet pass fake_client = self.FakeClient() for msg in ['good', 'fine', 'unexpected behaviour', 'back to normal']: fake_client.queue.append(('unreliable_worker', msg)) fn = os.path.join(settings.QUEUE_ERROR_DIR, 'unreliable_worker.errors') try: os.remove(fn) except OSError: pass with simulated_queue_client(lambda: fake_client): worker = UnreliableWorker() worker.setup() worker.start() self.assertEqual(processed, ['good', 'fine', 'back to normal']) line = open(fn).readline().strip() event = ujson.loads(line.split('\t')[1]) self.assertEqual(event, 'unexpected behaviour') def test_worker_noname(self): class TestWorker(queue_processors.QueueProcessingWorker): def __init__(self): super(TestWorker, self).__init__() def consume(self, data): pass with self.assertRaises(queue_processors.WorkerDeclarationException): TestWorker() def test_worker_noconsume(self): @queue_processors.assign_queue('test_worker') class TestWorker(queue_processors.QueueProcessingWorker): def __init__(self): super(TestWorker, self).__init__() with self.assertRaises(queue_processors.WorkerDeclarationException): worker = TestWorker() worker.consume({}) class ActivityTest(AuthedTestCase): def test_activity(self): self.login("hamlet@zulip.com") client, _ = Client.objects.get_or_create(name='website') query = '/json/update_pointer' last_visit = datetime.datetime.now() count=150 for user_profile in UserProfile.objects.all(): UserActivity.objects.get_or_create( user_profile=user_profile, client=client, query=query, count=count, last_visit=last_visit ) with queries_captured() as queries: self.client.get('/activity') self.assert_length(queries, 12) class UserProfileTest(TestCase): def test_get_emails_from_user_ids(self): hamlet = get_user_profile_by_email('hamlet@zulip.com') othello = get_user_profile_by_email('othello@zulip.com') dct = get_emails_from_user_ids([hamlet.id, othello.id]) self.assertEqual(dct[hamlet.id], 'hamlet@zulip.com') self.assertEqual(dct[othello.id], 'othello@zulip.com') class UserChangesTest(AuthedTestCase): def test_update_api_key(self): email = "hamlet@zulip.com" self.login(email) user = get_user_profile_by_email(email) old_api_key = user.api_key result = self.client.post('/json/users/me/api_key/regenerate') self.assert_json_success(result) new_api_key = ujson.loads(result.content)['api_key'] self.assertNotEqual(old_api_key, new_api_key) user = get_user_profile_by_email(email) self.assertEqual(new_api_key, user.api_key) class ActivateTest(AuthedTestCase): def test_basics(self): user = get_user_profile_by_email('hamlet@zulip.com') do_deactivate_user(user) self.assertFalse(user.is_active) do_reactivate_user(user) self.assertTrue(user.is_active) def test_api(self): admin = get_user_profile_by_email('othello@zulip.com') do_change_is_admin(admin, True) self.login('othello@zulip.com') user = get_user_profile_by_email('hamlet@zulip.com') self.assertTrue(user.is_active) result = self.client_delete('/json/users/hamlet@zulip.com') self.assert_json_success(result) user = get_user_profile_by_email('hamlet@zulip.com') self.assertFalse(user.is_active) result = self.client.post('/json/users/hamlet@zulip.com/reactivate') self.assert_json_success(result) user = get_user_profile_by_email('hamlet@zulip.com') self.assertTrue(user.is_active) # Can not deactivate a user as a bot result = self.client_delete('/json/bots/hamlet@zulip.com') self.assert_json_error(result, 'No such bot') class BotTest(AuthedTestCase): def assert_num_bots_equal(self, count): result = self.client.get("/json/bots") self.assert_json_success(result) json = ujson.loads(result.content) self.assertEqual(count, len(json['bots'])) def create_bot(self, **extras): bot_info = { 'full_name': 'The Bot of Hamlet', 'short_name': 'hambot', } bot_info.update(extras) result = self.client.post("/json/bots", bot_info) self.assert_json_success(result) return ujson.loads(result.content) def deactivate_bot(self): result = self.client_delete("/json/bots/hambot-bot@zulip.com") self.assert_json_success(result) def test_add_bot(self): self.login("hamlet@zulip.com") self.assert_num_bots_equal(0) events = [] with tornado_redirected_to_list(events): result = self.create_bot() self.assert_num_bots_equal(1) event = [e for e in events if e['event']['type'] == 'realm_bot'][0] self.assertEqual( dict( type='realm_bot', op='add', bot=dict(email='hambot-bot@zulip.com', full_name='The Bot of Hamlet', api_key=result['api_key'], avatar_url=result['avatar_url'], default_sending_stream=None, default_events_register_stream=None, default_all_public_streams=False, owner='hamlet@zulip.com', ) ), event['event'] ) def test_add_bot_with_default_sending_stream(self): self.login("hamlet@zulip.com") self.assert_num_bots_equal(0) result = self.create_bot(default_sending_stream='Denmark') self.assert_num_bots_equal(1) self.assertEqual(result['default_sending_stream'], 'Denmark') profile = get_user_profile_by_email('hambot-bot@zulip.com') self.assertEqual(profile.default_sending_stream.name, 'Denmark') def test_add_bot_with_default_sending_stream_not_subscribed(self): self.login("hamlet@zulip.com") self.assert_num_bots_equal(0) result = self.create_bot(default_sending_stream='Rome') self.assert_num_bots_equal(1) self.assertEqual(result['default_sending_stream'], 'Rome') profile = get_user_profile_by_email('hambot-bot@zulip.com') self.assertEqual(profile.default_sending_stream.name, 'Rome') def test_add_bot_with_default_sending_stream_private_allowed(self): self.login("hamlet@zulip.com") user_profile = get_user_profile_by_email("hamlet@zulip.com") stream = get_stream("Denmark", user_profile.realm) do_add_subscription(user_profile, stream) do_make_stream_private(user_profile.realm, "Denmark") self.assert_num_bots_equal(0) events = [] with tornado_redirected_to_list(events): result = self.create_bot(default_sending_stream='Denmark') self.assert_num_bots_equal(1) self.assertEqual(result['default_sending_stream'], 'Denmark') profile = get_user_profile_by_email('hambot-bot@zulip.com') self.assertEqual(profile.default_sending_stream.name, 'Denmark') event = [e for e in events if e['event']['type'] == 'realm_bot'][0] self.assertEqual( dict( type='realm_bot', op='add', bot=dict(email='hambot-bot@zulip.com', full_name='The Bot of Hamlet', api_key=result['api_key'], avatar_url=result['avatar_url'], default_sending_stream='Denmark', default_events_register_stream=None, default_all_public_streams=False, owner='hamlet@zulip.com', ) ), event['event'] ) self.assertEqual(event['users'], (user_profile.id,)) def test_add_bot_with_default_sending_stream_private_denied(self): self.login("hamlet@zulip.com") user_profile = get_user_profile_by_email("hamlet@zulip.com") stream = get_stream("Denmark", user_profile.realm) do_remove_subscription(user_profile, stream) do_make_stream_private(user_profile.realm, "Denmark") bot_info = { 'full_name': 'The Bot of Hamlet', 'short_name': 'hambot', 'default_sending_stream': 'Denmark', } result = self.client.post("/json/bots", bot_info) self.assert_json_error(result, 'Insufficient permission') def test_add_bot_with_default_events_register_stream(self): self.login("hamlet@zulip.com") self.assert_num_bots_equal(0) result = self.create_bot(default_events_register_stream='Denmark') self.assert_num_bots_equal(1) self.assertEqual(result['default_events_register_stream'], 'Denmark') profile = get_user_profile_by_email('hambot-bot@zulip.com') self.assertEqual(profile.default_events_register_stream.name, 'Denmark') def test_add_bot_with_default_events_register_stream_private_allowed(self): self.login("hamlet@zulip.com") user_profile = get_user_profile_by_email("hamlet@zulip.com") stream = get_stream("Denmark", user_profile.realm) do_add_subscription(user_profile, stream) do_make_stream_private(user_profile.realm, "Denmark") self.assert_num_bots_equal(0) events = [] with tornado_redirected_to_list(events): result = self.create_bot(default_events_register_stream='Denmark') self.assert_num_bots_equal(1) self.assertEqual(result['default_events_register_stream'], 'Denmark') profile = get_user_profile_by_email('hambot-bot@zulip.com') self.assertEqual(profile.default_events_register_stream.name, 'Denmark') event = [e for e in events if e['event']['type'] == 'realm_bot'][0] self.assertEqual( dict( type='realm_bot', op='add', bot=dict(email='hambot-bot@zulip.com', full_name='The Bot of Hamlet', api_key=result['api_key'], avatar_url=result['avatar_url'], default_sending_stream=None, default_events_register_stream='Denmark', default_all_public_streams=False, owner='hamlet@zulip.com', ) ), event['event'] ) self.assertEqual(event['users'], (user_profile.id,)) def test_add_bot_with_default_events_register_stream_private_denied(self): self.login("hamlet@zulip.com") user_profile = get_user_profile_by_email("hamlet@zulip.com") stream = get_stream("Denmark", user_profile.realm) do_remove_subscription(user_profile, stream) do_make_stream_private(user_profile.realm, "Denmark") self.assert_num_bots_equal(0) bot_info = { 'full_name': 'The Bot of Hamlet', 'short_name': 'hambot', 'default_events_register_stream': 'Denmark', } result = self.client.post("/json/bots", bot_info) self.assert_json_error(result, 'Insufficient permission') def test_add_bot_with_default_all_public_streams(self): self.login("hamlet@zulip.com") self.assert_num_bots_equal(0) result = self.create_bot(default_all_public_streams=ujson.dumps(True)) self.assert_num_bots_equal(1) self.assertTrue(result['default_all_public_streams']) profile = get_user_profile_by_email('hambot-bot@zulip.com') self.assertEqual(profile.default_all_public_streams, True) def test_deactivate_bot(self): self.login("hamlet@zulip.com") self.assert_num_bots_equal(0) self.create_bot() self.assert_num_bots_equal(1) self.deactivate_bot() # You can deactivate the same bot twice. self.deactivate_bot() self.assert_num_bots_equal(0) def test_deactivate_bogus_bot(self): # Deleting a bogus bot will succeed silently. self.login("hamlet@zulip.com") self.assert_num_bots_equal(0) self.create_bot() self.assert_num_bots_equal(1) result = self.client_delete("/json/bots/bogus-bot@zulip.com") self.assert_json_error(result, 'No such bot') self.assert_num_bots_equal(1) def test_bot_deactivation_attacks(self): # You cannot deactivate somebody else's bot. self.login("hamlet@zulip.com") self.assert_num_bots_equal(0) self.create_bot() self.assert_num_bots_equal(1) # Have Othello try to deactivate both Hamlet and # Hamlet's bot. self.login("othello@zulip.com") # Can not deactivate a user as a bot result = self.client_delete("/json/bots/hamlet@zulip.com") self.assert_json_error(result, 'No such bot') result = self.client_delete("/json/bots/hambot-bot@zulip.com") self.assert_json_error(result, 'Insufficient permission') # But we don't actually deactivate the other person's bot. self.login("hamlet@zulip.com") self.assert_num_bots_equal(1) # Can not deactivate a bot as a user result = self.client_delete("/json/users/hambot-bot@zulip.com") self.assert_json_error(result, 'No such user') self.assert_num_bots_equal(1) def test_bot_permissions(self): self.login("hamlet@zulip.com") self.assert_num_bots_equal(0) self.create_bot() self.assert_num_bots_equal(1) # Have Othello try to mess with Hamlet's bots. self.login("othello@zulip.com") result = self.client.post("/json/bots/hambot-bot@zulip.com/api_key/regenerate") self.assert_json_error(result, 'Insufficient permission') bot_info = { 'full_name': 'Fred', } result = self.client_patch("/json/bots/hambot-bot@zulip.com", bot_info) self.assert_json_error(result, 'Insufficient permission') def get_bot(self): result = self.client.get("/json/bots") bots = ujson.loads(result.content)['bots'] return bots[0] def test_update_api_key(self): self.login("hamlet@zulip.com") self.create_bot() bot = self.get_bot() old_api_key = bot['api_key'] result = self.client.post('/json/bots/hambot-bot@zulip.com/api_key/regenerate') self.assert_json_success(result) new_api_key = ujson.loads(result.content)['api_key'] self.assertNotEqual(old_api_key, new_api_key) bot = self.get_bot() self.assertEqual(new_api_key, bot['api_key']) def test_patch_bot_full_name(self): self.login("hamlet@zulip.com") bot_info = { 'full_name': 'The Bot of Hamlet', 'short_name': 'hambot', } result = self.client.post("/json/bots", bot_info) self.assert_json_success(result) bot_info = { 'full_name': 'Fred', } result = self.client_patch("/json/bots/hambot-bot@zulip.com", bot_info) self.assert_json_success(result) full_name = ujson.loads(result.content)['full_name'] self.assertEqual('Fred', full_name) bot = self.get_bot() self.assertEqual('Fred', bot['full_name']) def test_patch_bot_to_stream(self): self.login("hamlet@zulip.com") bot_info = { 'full_name': 'The Bot of Hamlet', 'short_name': 'hambot', } result = self.client.post("/json/bots", bot_info) self.assert_json_success(result) bot_info = { 'default_sending_stream': 'Denmark', } result = self.client_patch("/json/bots/hambot-bot@zulip.com", bot_info) self.assert_json_success(result) default_sending_stream = ujson.loads(result.content)['default_sending_stream'] self.assertEqual('Denmark', default_sending_stream) bot = self.get_bot() self.assertEqual('Denmark', bot['default_sending_stream']) def test_patch_bot_to_stream_not_subscribed(self): self.login("hamlet@zulip.com") bot_info = { 'full_name': 'The Bot of Hamlet', 'short_name': 'hambot', } result = self.client.post("/json/bots", bot_info) self.assert_json_success(result) bot_info = { 'default_sending_stream': 'Rome', } result = self.client_patch("/json/bots/hambot-bot@zulip.com", bot_info) self.assert_json_success(result) default_sending_stream = ujson.loads(result.content)['default_sending_stream'] self.assertEqual('Rome', default_sending_stream) bot = self.get_bot() self.assertEqual('Rome', bot['default_sending_stream']) def test_patch_bot_to_stream_none(self): self.login("hamlet@zulip.com") bot_info = { 'full_name': 'The Bot of Hamlet', 'short_name': 'hambot', } result = self.client.post("/json/bots", bot_info) self.assert_json_success(result) bot_info = { 'default_sending_stream': '', } result = self.client_patch("/json/bots/hambot-bot@zulip.com", bot_info) self.assert_json_success(result) default_sending_stream = ujson.loads(result.content)['default_sending_stream'] self.assertEqual(None, default_sending_stream) bot = self.get_bot() self.assertEqual(None, bot['default_sending_stream']) def test_patch_bot_to_stream_private_allowed(self): self.login("hamlet@zulip.com") user_profile = get_user_profile_by_email("hamlet@zulip.com") stream = get_stream("Denmark", user_profile.realm) do_add_subscription(user_profile, stream) do_make_stream_private(user_profile.realm, "Denmark") bot_info = { 'full_name': 'The Bot of Hamlet', 'short_name': 'hambot', } result = self.client.post("/json/bots", bot_info) self.assert_json_success(result) bot_info = { 'default_sending_stream': 'Denmark', } result = self.client_patch("/json/bots/hambot-bot@zulip.com", bot_info) self.assert_json_success(result) default_sending_stream = ujson.loads(result.content)['default_sending_stream'] self.assertEqual('Denmark', default_sending_stream) bot = self.get_bot() self.assertEqual('Denmark', bot['default_sending_stream']) def test_patch_bot_to_stream_private_denied(self): self.login("hamlet@zulip.com") user_profile = get_user_profile_by_email("hamlet@zulip.com") stream = get_stream("Denmark", user_profile.realm) do_remove_subscription(user_profile, stream) do_make_stream_private(user_profile.realm, "Denmark") bot_info = { 'full_name': 'The Bot of Hamlet', 'short_name': 'hambot', } result = self.client.post("/json/bots", bot_info) self.assert_json_success(result) bot_info = { 'default_sending_stream': 'Denmark', } result = self.client_patch("/json/bots/hambot-bot@zulip.com", bot_info) self.assert_json_error(result, 'Insufficient permission') def test_patch_bot_to_stream_not_found(self): self.login("hamlet@zulip.com") bot_info = { 'full_name': 'The Bot of Hamlet', 'short_name': 'hambot', } result = self.client.post("/json/bots", bot_info) self.assert_json_success(result) bot_info = { 'default_sending_stream': 'missing', } result = self.client_patch("/json/bots/hambot-bot@zulip.com", bot_info) self.assert_json_error(result, 'No such stream \'missing\'') def test_patch_bot_events_register_stream(self): self.login("hamlet@zulip.com") bot_info = { 'full_name': 'The Bot of Hamlet', 'short_name': 'hambot', } result = self.client.post("/json/bots", bot_info) self.assert_json_success(result) bot_info = { 'default_events_register_stream': 'Denmark', } result = self.client_patch("/json/bots/hambot-bot@zulip.com", bot_info) self.assert_json_success(result) default_events_register_stream = ujson.loads(result.content)['default_events_register_stream'] self.assertEqual('Denmark', default_events_register_stream) bot = self.get_bot() self.assertEqual('Denmark', bot['default_events_register_stream']) def test_patch_bot_events_register_stream_allowed(self): self.login("hamlet@zulip.com") user_profile = get_user_profile_by_email("hamlet@zulip.com") stream = get_stream("Denmark", user_profile.realm) do_add_subscription(user_profile, stream) do_make_stream_private(user_profile.realm, "Denmark") bot_info = { 'full_name': 'The Bot of Hamlet', 'short_name': 'hambot', } result = self.client.post("/json/bots", bot_info) self.assert_json_success(result) bot_info = { 'default_events_register_stream': 'Denmark', } result = self.client_patch("/json/bots/hambot-bot@zulip.com", bot_info) self.assert_json_success(result) default_events_register_stream = ujson.loads(result.content)['default_events_register_stream'] self.assertEqual('Denmark', default_events_register_stream) bot = self.get_bot() self.assertEqual('Denmark', bot['default_events_register_stream']) def test_patch_bot_events_register_stream_denied(self): self.login("hamlet@zulip.com") user_profile = get_user_profile_by_email("hamlet@zulip.com") stream = get_stream("Denmark", user_profile.realm) do_remove_subscription(user_profile, stream) do_make_stream_private(user_profile.realm, "Denmark") bot_info = { 'full_name': 'The Bot of Hamlet', 'short_name': 'hambot', } result = self.client.post("/json/bots", bot_info) self.assert_json_success(result) bot_info = { 'default_events_register_stream': 'Denmark', } result = self.client_patch("/json/bots/hambot-bot@zulip.com", bot_info) self.assert_json_error(result, 'Insufficient permission') def test_patch_bot_events_register_stream_none(self): self.login("hamlet@zulip.com") bot_info = { 'full_name': 'The Bot of Hamlet', 'short_name': 'hambot', } result = self.client.post("/json/bots", bot_info) self.assert_json_success(result) bot_info = { 'default_events_register_stream': '', } result = self.client_patch("/json/bots/hambot-bot@zulip.com", bot_info) self.assert_json_success(result) default_events_register_stream = ujson.loads(result.content)['default_events_register_stream'] self.assertEqual(None, default_events_register_stream) bot = self.get_bot() self.assertEqual(None, bot['default_events_register_stream']) def test_patch_bot_events_register_stream_not_found(self): self.login("hamlet@zulip.com") bot_info = { 'full_name': 'The Bot of Hamlet', 'short_name': 'hambot', } result = self.client.post("/json/bots", bot_info) self.assert_json_success(result) bot_info = { 'default_events_register_stream': 'missing', } result = self.client_patch("/json/bots/hambot-bot@zulip.com", bot_info) self.assert_json_error(result, 'No such stream \'missing\'') def test_patch_bot_default_all_public_streams_true(self): self.login("hamlet@zulip.com") bot_info = { 'full_name': 'The Bot of Hamlet', 'short_name': 'hambot', } result = self.client.post("/json/bots", bot_info) self.assert_json_success(result) bot_info = { 'default_all_public_streams': ujson.dumps(True), } result = self.client_patch("/json/bots/hambot-bot@zulip.com", bot_info) self.assert_json_success(result) default_events_register_stream = ujson.loads(result.content)['default_all_public_streams'] self.assertEqual(default_events_register_stream, True) bot = self.get_bot() self.assertEqual(bot['default_all_public_streams'], True) def test_patch_bot_default_all_public_streams_false(self): self.login("hamlet@zulip.com") bot_info = { 'full_name': 'The Bot of Hamlet', 'short_name': 'hambot', } result = self.client.post("/json/bots", bot_info) self.assert_json_success(result) bot_info = { 'default_all_public_streams': ujson.dumps(False), } result = self.client_patch("/json/bots/hambot-bot@zulip.com", bot_info) self.assert_json_success(result) default_events_register_stream = ujson.loads(result.content)['default_all_public_streams'] self.assertEqual(default_events_register_stream, False) bot = self.get_bot() self.assertEqual(bot['default_all_public_streams'], False) def test_patch_bot_via_post(self): self.login("hamlet@zulip.com") bot_info = { 'full_name': 'The Bot of Hamlet', 'short_name': 'hambot', } result = self.client.post("/json/bots", bot_info) self.assert_json_success(result) bot_info = { 'full_name': 'Fred', 'method': 'PATCH' } result = self.client.post("/json/bots/hambot-bot@zulip.com", bot_info) self.assert_json_success(result) full_name = ujson.loads(result.content)['full_name'] self.assertEqual('Fred', full_name) bot = self.get_bot() self.assertEqual('Fred', bot['full_name']) def test_patch_bogus_bot(self): # Deleting a bogus bot will succeed silently. self.login("hamlet@zulip.com") self.create_bot() bot_info = { 'full_name': 'Fred', } result = self.client_patch("/json/bots/nonexistent-bot@zulip.com", bot_info) self.assert_json_error(result, 'No such user') self.assert_num_bots_equal(1) class ChangeSettingsTest(AuthedTestCase): def post_with_params(self, modified_params): post_params = {"full_name": "Foo Bar", "old_password": initial_password("hamlet@zulip.com"), "new_password": "foobar1", "confirm_password": "foobar1", } post_params.update(modified_params) return self.client.post("/json/settings/change", dict(post_params)) def check_well_formed_change_settings_response(self, result): self.assertIn("full_name", result) def test_successful_change_settings(self): """ A call to /json/settings/change with valid parameters changes the user's settings correctly and returns correct values. """ self.login("hamlet@zulip.com") json_result = self.post_with_params({}) self.assert_json_success(json_result) result = ujson.loads(json_result.content) self.check_well_formed_change_settings_response(result) self.assertEqual(get_user_profile_by_email("hamlet@zulip.com"). full_name, "Foo Bar") self.client.post('/accounts/logout/') self.login("hamlet@zulip.com", "foobar1") user_profile = get_user_profile_by_email('hamlet@zulip.com') self.assertEqual(get_session_dict_user(self.client.session), user_profile.id) def test_notify_settings(self): # This is basically a don't-explode test. self.login("hamlet@zulip.com") json_result = self.client.post("/json/notify_settings/change", {"enable_desktop_notifications": ujson.dumps(False)}) self.assert_json_success(json_result) self.assertEqual(get_user_profile_by_email("hamlet@zulip.com"). enable_desktop_notifications, False) def test_ui_settings(self): self.login("hamlet@zulip.com") json_result = self.client.post("/json/ui_settings/change", {"autoscroll_forever": ujson.dumps(True)}) self.assert_json_success(json_result) self.assertEqual(get_user_profile_by_email("hamlet@zulip.com"). enable_desktop_notifications, True) json_result = self.client.post("/json/ui_settings/change", {"autoscroll_forever": ujson.dumps(False)}) self.assert_json_success(json_result) self.assertEqual(get_user_profile_by_email("hamlet@zulip.com"). autoscroll_forever, False) json_result = self.client.post("/json/ui_settings/change", {"default_desktop_notifications": ujson.dumps(True)}) self.assert_json_success(json_result) self.assertEqual(get_user_profile_by_email("hamlet@zulip.com"). default_desktop_notifications, True) json_result = self.client.post("/json/ui_settings/change", {"default_desktop_notifications": ujson.dumps(False)}) self.assert_json_success(json_result) self.assertEqual(get_user_profile_by_email("hamlet@zulip.com"). default_desktop_notifications, False) def test_missing_params(self): """ full_name is a required POST parameter for json_change_settings. (enable_desktop_notifications is false by default, and password is only required if you are changing it) """ self.login("hamlet@zulip.com") result = self.client.post("/json/settings/change", {}) self.assert_json_error(result, "Missing '%s' argument" % ("full_name",)) def test_mismatching_passwords(self): """ new_password and confirm_password must match """ self.login("hamlet@zulip.com") result = self.post_with_params({"new_password": "mismatched_password"}) self.assert_json_error(result, "New password must match confirmation password!") def test_wrong_old_password(self): """ new_password and confirm_password must match """ self.login("hamlet@zulip.com") result = self.post_with_params({"old_password": "bad_password"}) self.assert_json_error(result, "Wrong password!") class GetProfileTest(AuthedTestCase): def common_update_pointer(self, email, pointer): self.login(email) result = self.client.post("/json/update_pointer", {"pointer": pointer}) self.assert_json_success(result) def common_get_profile(self, email): user_profile = get_user_profile_by_email(email) self.send_message(email, "Verona", Recipient.STREAM, "hello") result = self.client.get("/api/v1/users/me", **self.api_auth(email)) max_id = most_recent_message(user_profile).id self.assert_json_success(result) json = ujson.loads(result.content) self.assertIn("client_id", json) self.assertIn("max_message_id", json) self.assertIn("pointer", json) self.assertEqual(json["max_message_id"], max_id) return json def test_cache_behavior(self): with queries_captured() as queries: with simulated_empty_cache() as cache_queries: user_profile = get_user_profile_by_email('hamlet@zulip.com') self.assert_length(queries, 1) self.assert_length(cache_queries, 1, exact=True) self.assertEqual(user_profile.email, 'hamlet@zulip.com') def test_api_get_empty_profile(self): """ Ensure get_profile returns a max message id and returns successfully """ json = self.common_get_profile("othello@zulip.com") self.assertEqual(json["pointer"], -1) def test_profile_with_pointer(self): """ Ensure get_profile returns a proper pointer id after the pointer is updated """ id1 = self.send_message("othello@zulip.com", "Verona", Recipient.STREAM) id2 = self.send_message("othello@zulip.com", "Verona", Recipient.STREAM) json = self.common_get_profile("hamlet@zulip.com") self.common_update_pointer("hamlet@zulip.com", id2) json = self.common_get_profile("hamlet@zulip.com") self.assertEqual(json["pointer"], id2) self.common_update_pointer("hamlet@zulip.com", id1) json = self.common_get_profile("hamlet@zulip.com") self.assertEqual(json["pointer"], id2) # pointer does not move backwards result = self.client.post("/json/update_pointer", {"pointer": 99999999}) self.assert_json_error(result, "Invalid message ID") def test_get_all_profiles_avatar_urls(self): user_profile = get_user_profile_by_email('hamlet@zulip.com') result = self.client.get("/api/v1/users", **self.api_auth('hamlet@zulip.com')) self.assert_json_success(result) json = ujson.loads(result.content) for user in json['members']: if user['email'] == 'hamlet@zulip.com': self.assertEqual( user['avatar_url'], get_avatar_url(user_profile.avatar_source, user_profile.email), ) class UserPresenceTests(AuthedTestCase): def test_get_empty(self): self.login("hamlet@zulip.com") result = self.client.post("/json/get_active_statuses") self.assert_json_success(result) json = ujson.loads(result.content) for email, presence in json['presences'].items(): self.assertEqual(presence, {}) def test_set_idle(self): email = "hamlet@zulip.com" self.login(email) client = 'website' def test_result(result): self.assert_json_success(result) json = ujson.loads(result.content) self.assertEqual(json['presences'][email][client]['status'], 'idle') self.assertIn('timestamp', json['presences'][email][client]) self.assertIsInstance(json['presences'][email][client]['timestamp'], int) self.assertEqual(list(json['presences'].keys()), ['hamlet@zulip.com']) return json['presences'][email][client]['timestamp'] result = self.client.post("/json/update_active_status", {'status': 'idle'}) test_result(result) result = self.client.post("/json/get_active_statuses", {}) timestamp = test_result(result) email = "othello@zulip.com" self.login(email) self.client.post("/json/update_active_status", {'status': 'idle'}) result = self.client.post("/json/get_active_statuses", {}) self.assert_json_success(result) json = ujson.loads(result.content) self.assertEqual(json['presences'][email][client]['status'], 'idle') self.assertEqual(json['presences']['hamlet@zulip.com'][client]['status'], 'idle') self.assertEqual(list(json['presences'].keys()), ['hamlet@zulip.com', 'othello@zulip.com']) newer_timestamp = json['presences'][email][client]['timestamp'] self.assertGreaterEqual(newer_timestamp, timestamp) def test_set_active(self): self.login("hamlet@zulip.com") client = 'website' self.client.post("/json/update_active_status", {'status': 'idle'}) result = self.client.post("/json/get_active_statuses", {}) self.assert_json_success(result) json = ujson.loads(result.content) self.assertEqual(json['presences']["hamlet@zulip.com"][client]['status'], 'idle') email = "othello@zulip.com" self.login("othello@zulip.com") self.client.post("/json/update_active_status", {'status': 'idle'}) result = self.client.post("/json/get_active_statuses", {}) self.assert_json_success(result) json = ujson.loads(result.content) self.assertEqual(json['presences'][email][client]['status'], 'idle') self.assertEqual(json['presences']['hamlet@zulip.com'][client]['status'], 'idle') self.client.post("/json/update_active_status", {'status': 'active'}) result = self.client.post("/json/get_active_statuses", {}) self.assert_json_success(result) json = ujson.loads(result.content) self.assertEqual(json['presences'][email][client]['status'], 'active') self.assertEqual(json['presences']['hamlet@zulip.com'][client]['status'], 'idle') def test_no_mit(self): # MIT never gets a list of users self.login("espuser@mit.edu") result = self.client.post("/json/update_active_status", {'status': 'idle'}) self.assert_json_success(result) json = ujson.loads(result.content) self.assertEqual(json['presences'], {}) def test_same_realm(self): self.login("espuser@mit.edu") self.client.post("/json/update_active_status", {'status': 'idle'}) result = self.client.post("/accounts/logout/") # Ensure we don't see hamlet@zulip.com information leakage self.login("hamlet@zulip.com") result = self.client.post("/json/update_active_status", {'status': 'idle'}) self.assert_json_success(result) json = ujson.loads(result.content) self.assertEqual(json['presences']["hamlet@zulip.com"]["website"]['status'], 'idle') # We only want @zulip.com emails for email in json['presences'].keys(): self.assertEqual(split_email_to_domain(email), 'zulip.com') class AlertWordTests(AuthedTestCase): interesting_alert_word_list = ['alert', 'multi-word word', u'☃'] def test_internal_endpoint(self): email = "cordelia@zulip.com" self.login(email) params = { 'alert_words': ujson.dumps(['milk', 'cookies']) } result = self.client.post('/json/set_alert_words', params) self.assert_json_success(result) user = get_user_profile_by_email(email) words = user_alert_words(user) self.assertEqual(words, ['milk', 'cookies']) def test_default_no_words(self): """ Users start out with no alert words. """ email = "cordelia@zulip.com" user = get_user_profile_by_email(email) words = user_alert_words(user) self.assertEqual(words, []) def test_add_word(self): """ add_user_alert_words can add multiple alert words at once. """ email = "cordelia@zulip.com" user = get_user_profile_by_email(email) # Add several words, including multi-word and non-ascii words. add_user_alert_words(user, self.interesting_alert_word_list) words = user_alert_words(user) self.assertEqual(words, self.interesting_alert_word_list) def test_remove_word(self): """ Removing alert words works via remove_user_alert_words, even for multi-word and non-ascii words. """ email = "cordelia@zulip.com" user = get_user_profile_by_email(email) add_user_alert_words(user, self.interesting_alert_word_list) theoretical_remaining_alerts = self.interesting_alert_word_list[:] for alert_word in self.interesting_alert_word_list: remove_user_alert_words(user, alert_word) theoretical_remaining_alerts.remove(alert_word) actual_remaining_alerts = user_alert_words(user) self.assertEqual(actual_remaining_alerts, theoretical_remaining_alerts) def test_realm_words(self): """ We can gather alert words for an entire realm via alert_words_in_realm. Alerts added for one user do not impact other users. """ email = "cordelia@zulip.com" user1 = get_user_profile_by_email(email) add_user_alert_words(user1, self.interesting_alert_word_list) email = "othello@zulip.com" user2 = get_user_profile_by_email(email) add_user_alert_words(user2, ['another']) realm_words = alert_words_in_realm(user2.realm) self.assertEqual(len(realm_words), 2) self.assertEqual(list(realm_words.keys()), [user1.id, user2.id]) self.assertEqual(realm_words[user1.id], self.interesting_alert_word_list) self.assertEqual(realm_words[user2.id], ['another']) def test_json_list_default(self): self.login("hamlet@zulip.com") result = self.client.get('/json/users/me/alert_words') self.assert_json_success(result) data = ujson.loads(result.content) self.assertEqual(data['alert_words'], []) def test_json_list_add(self): self.login("hamlet@zulip.com") result = self.client_patch('/json/users/me/alert_words', {'alert_words': ujson.dumps(['one', 'two', 'three'])}) self.assert_json_success(result) result = self.client.get('/json/users/me/alert_words') self.assert_json_success(result) data = ujson.loads(result.content) self.assertEqual(data['alert_words'], ['one', 'two', 'three']) def test_json_list_remove(self): self.login("hamlet@zulip.com") result = self.client_patch('/json/users/me/alert_words', {'alert_words': ujson.dumps(['one', 'two', 'three'])}) self.assert_json_success(result) result = self.client_delete('/json/users/me/alert_words', {'alert_words': ujson.dumps(['one'])}) self.assert_json_success(result) result = self.client.get('/json/users/me/alert_words') self.assert_json_success(result) data = ujson.loads(result.content) self.assertEqual(data['alert_words'], ['two', 'three']) def test_json_list_set(self): self.login("hamlet@zulip.com") result = self.client_patch('/json/users/me/alert_words', {'alert_words': ujson.dumps(['one', 'two', 'three'])}) self.assert_json_success(result) result = self.client_put('/json/users/me/alert_words', {'alert_words': ujson.dumps(['a', 'b', 'c'])}) self.assert_json_success(result) result = self.client.get('/json/users/me/alert_words') self.assert_json_success(result) data = ujson.loads(result.content) self.assertEqual(data['alert_words'], ['a', 'b', 'c']) def message_does_alert(self, user_profile, message): # Send a bunch of messages as othello, so Hamlet is notified self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, message) message = most_recent_usermessage(user_profile) return 'has_alert_word' in message.flags_list() def test_alert_flags(self): self.login("hamlet@zulip.com") user_profile_hamlet = get_user_profile_by_email("hamlet@zulip.com") result = self.client_patch('/json/users/me/alert_words', {'alert_words': ujson.dumps(['one', 'two', 'three'])}) self.assert_json_success(result) result = self.client.get('/json/users/me/alert_words') self.assert_json_success(result) data = ujson.loads(result.content) self.assertEqual(data['alert_words'], ['one', 'two', 'three']) # Alerts in the middle of messages work. self.assertTrue(self.message_does_alert(user_profile_hamlet, "Normal alert one time")) # Alerts at the end of messages work. self.assertTrue(self.message_does_alert(user_profile_hamlet, "Normal alert one")) # Alerts at the beginning of messages work. self.assertTrue(self.message_does_alert(user_profile_hamlet, "two normal alerts")) # Alerts with surrounding punctuation work. self.assertTrue(self.message_does_alert(user_profile_hamlet, "This one? should alert")) self.assertTrue(self.message_does_alert(user_profile_hamlet, "Definitely time for three.")) # Multiple alerts in a message work. self.assertTrue(self.message_does_alert(user_profile_hamlet, "One two three o'clock")) # Alerts are case-insensitive. self.assertTrue(self.message_does_alert(user_profile_hamlet, "One o'clock")) self.assertTrue(self.message_does_alert(user_profile_hamlet, "Case of ONE, won't stop me")) # We don't cause alerts for matches in URLs. self.assertFalse(self.message_does_alert(user_profile_hamlet, "Don't alert on http://t.co/one/ urls")) self.assertFalse(self.message_does_alert(user_profile_hamlet, "Don't alert on http://t.co/one urls")) class MutedTopicsTests(AuthedTestCase): def test_json_set(self): email = 'hamlet@zulip.com' self.login(email) url = '/json/set_muted_topics' data = {'muted_topics': '[["stream", "topic"]]'} result = self.client.post(url, data) self.assert_json_success(result) user = get_user_profile_by_email(email) self.assertEqual(ujson.loads(user.muted_topics), [["stream", "topic"]]) url = '/json/set_muted_topics' data = {'muted_topics': '[["stream2", "topic2"]]'} result = self.client.post(url, data) self.assert_json_success(result) user = get_user_profile_by_email(email) self.assertEqual(ujson.loads(user.muted_topics), [["stream2", "topic2"]]) class ExtractedRecipientsTest(TestCase): def test_extract_recipients(self): # JSON list w/dups, empties, and trailing whitespace s = ujson.dumps([' alice@zulip.com ', ' bob@zulip.com ', ' ', 'bob@zulip.com']) self.assertItemsEqual(extract_recipients(s), ['alice@zulip.com', 'bob@zulip.com']) # simple string with one name s = 'alice@zulip.com ' self.assertItemsEqual(extract_recipients(s), ['alice@zulip.com']) # JSON-encoded string s = '"alice@zulip.com"' self.assertItemsEqual(extract_recipients(s), ['alice@zulip.com']) # bare comma-delimited string s = 'bob@zulip.com, alice@zulip.com' self.assertItemsEqual(extract_recipients(s), ['alice@zulip.com', 'bob@zulip.com']) # JSON-encoded, comma-delimited string s = '"bob@zulip.com,alice@zulip.com"' self.assertItemsEqual(extract_recipients(s), ['alice@zulip.com', 'bob@zulip.com']) class TestMissedMessages(AuthedTestCase): def test_extra_context_in_missed_stream_messages(self): self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, '0') self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, '1') self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, '2') self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, '3') self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, '4') self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, '5') self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, '6') self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, '7') self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, '8') self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, '9') self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, '10') self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, '11', subject='test2') msg_id = self.send_message("othello@zulip.com", "denmark", Recipient.STREAM, '@**hamlet**') hamlet = get_user_profile_by_email('hamlet@zulip.com') handle_missedmessage_emails(hamlet.id, [{'message_id': msg_id}]) def normalize_string(s): s = s.strip() return re.sub(r'\s+', ' ', s) self.assertEquals(len(mail.outbox), 1) self.assertIn( 'Denmark > test Othello, the Moor of Venice 1 2 3 4 5 6 7 8 9 10 @**hamlet**', normalize_string(mail.outbox[0].body), ) class TestOpenRealms(AuthedTestCase): def test_open_realm_logic(self): mit_realm = get_realm("mit.edu") self.assertEquals(get_unique_open_realm(), None) mit_realm.restricted_to_domain = False mit_realm.save() self.assertTrue(completely_open(mit_realm.domain)) self.assertEquals(get_unique_open_realm(), None) settings.VOYAGER = True self.assertEquals(get_unique_open_realm(), mit_realm) # Restore state settings.VOYAGER = False mit_realm.restricted_to_domain = True mit_realm.save()