# -*- coding: utf-8 -*- from typing import (Any, Dict, Iterable, List, Mapping, Optional, TypeVar, Union) from zerver.lib.test_helpers import ( queries_captured, simulated_empty_cache, tornado_redirected_to_list, get_subscription, most_recent_message, ) from zerver.lib.test_classes import ( ZulipTestCase, ) from zerver.models import UserProfile, Recipient, Realm, \ RealmDomain, UserHotspot, get_client, \ get_user, get_realm, get_stream, get_stream_recipient, \ get_source_profile, \ ScheduledEmail, check_valid_user_ids, \ get_user_by_id_in_realm_including_cross_realm, CustomProfileField from zerver.lib.avatar import avatar_url from zerver.lib.exceptions import JsonableError from zerver.lib.send_email import send_future_email, clear_scheduled_emails, \ deliver_email from zerver.lib.actions import ( get_emails_from_user_ids, get_recipient_info, do_deactivate_user, do_reactivate_user, do_change_is_admin, do_create_user, do_set_realm_property, ) from zerver.lib.create_user import copy_user_settings from zerver.lib.events import do_events_register from zerver.lib.topic_mutes import add_topic_mute from zerver.lib.stream_topic import StreamTopicTarget from zerver.lib.users import user_ids_to_users, access_user_by_id, \ get_accounts_for_email import datetime import mock import ujson K = TypeVar('K') V = TypeVar('V') def find_dict(lst: Iterable[Dict[K, V]], k: K, v: V) -> Dict[K, V]: for dct in lst: if dct[k] == v: return dct raise AssertionError('Cannot find element in list where key %s == %s' % (k, v)) class PermissionTest(ZulipTestCase): def test_do_change_is_admin(self) -> None: """ Ensures change_is_admin raises an AssertionError when invalid permissions are provided to it. """ # this should work fine user_profile = self.example_user('hamlet') do_change_is_admin(user_profile, True) # this should work a-ok as well do_change_is_admin(user_profile, True, permission='administer') # this should "fail" with an AssertionError with self.assertRaises(AssertionError): do_change_is_admin(user_profile, True, permission='totally-not-valid-perm') def test_get_admin_users(self) -> None: user_profile = self.example_user('hamlet') do_change_is_admin(user_profile, False) admin_users = user_profile.realm.get_admin_users() self.assertFalse(user_profile in admin_users) do_change_is_admin(user_profile, True) admin_users = user_profile.realm.get_admin_users() self.assertTrue(user_profile in admin_users) def test_updating_non_existent_user(self) -> None: self.login(self.example_email("hamlet")) admin = self.example_user('hamlet') do_change_is_admin(admin, True) invalid_user_id = 1000 result = self.client_patch('/json/users/{}'.format(invalid_user_id), {}) self.assert_json_error(result, 'No such user') def test_admin_api(self) -> None: self.login(self.example_email("hamlet")) admin = self.example_user('hamlet') user = self.example_user('othello') realm = admin.realm do_change_is_admin(admin, True) # Make sure we see is_admin flag in /json/users result = self.client_get('/json/users') self.assert_json_success(result) members = result.json()['members'] hamlet = find_dict(members, 'email', self.example_email("hamlet")) self.assertTrue(hamlet['is_admin']) othello = find_dict(members, 'email', self.example_email("othello")) self.assertFalse(othello['is_admin']) # Giveth req = dict(is_admin=ujson.dumps(True)) events = [] # type: List[Mapping[str, Any]] with tornado_redirected_to_list(events): result = self.client_patch('/json/users/{}'.format(self.example_user("othello").id), req) self.assert_json_success(result) admin_users = realm.get_admin_users() self.assertTrue(user in admin_users) person = events[0]['event']['person'] self.assertEqual(person['email'], self.example_email("othello")) self.assertEqual(person['is_admin'], True) # Taketh away req = dict(is_admin=ujson.dumps(False)) events = [] with tornado_redirected_to_list(events): result = self.client_patch('/json/users/{}'.format(self.example_user("othello").id), req) self.assert_json_success(result) admin_users = realm.get_admin_users() self.assertFalse(user in admin_users) person = events[0]['event']['person'] self.assertEqual(person['email'], self.example_email("othello")) self.assertEqual(person['is_admin'], False) # Cannot take away from last admin self.login(self.example_email("iago")) req = dict(is_admin=ujson.dumps(False)) events = [] with tornado_redirected_to_list(events): result = self.client_patch('/json/users/{}'.format(self.example_user("hamlet").id), req) self.assert_json_success(result) admin_users = realm.get_admin_users() self.assertFalse(admin in admin_users) person = events[0]['event']['person'] self.assertEqual(person['email'], self.example_email("hamlet")) self.assertEqual(person['is_admin'], False) with tornado_redirected_to_list([]): result = self.client_patch('/json/users/{}'.format(self.example_user("iago").id), req) self.assert_json_error(result, 'Cannot remove the only organization administrator') # Make sure only admins can patch other user's info. self.login(self.example_email("othello")) result = self.client_patch('/json/users/{}'.format(self.example_user("hamlet").id), req) self.assert_json_error(result, 'Insufficient permission') def test_admin_api_hide_emails(self) -> None: user = self.example_user('hamlet') admin = self.example_user('iago') self.login(user.email) # First, verify client_gravatar works normally result = self.client_get('/json/users?client_gravatar=true') self.assert_json_success(result) members = result.json()['members'] hamlet = find_dict(members, 'user_id', user.id) self.assertEqual(hamlet['email'], user.email) self.assertIsNone(hamlet['avatar_url']) self.assertNotIn('delivery_email', hamlet) # Also verify the /events code path. This is a bit hacky, but # we need to verify client_gravatar is not being overridden. with mock.patch('zerver.lib.events.request_event_queue', return_value=None) as mock_request_event_queue: with self.assertRaises(JsonableError): result = do_events_register(user, get_client("website"), client_gravatar=True) self.assertEqual(mock_request_event_queue.call_args_list[0][0][3], True) ############################################################# # Now, switch email address visibility, check client_gravatar # is automatically disabled for the user. do_set_realm_property(user.realm, "email_address_visibility", Realm.EMAIL_ADDRESS_VISIBILITY_ADMINS) result = self.client_get('/json/users?client_gravatar=true') self.assert_json_success(result) members = result.json()['members'] hamlet = find_dict(members, 'user_id', user.id) self.assertEqual(hamlet['email'], "user%s@zulip.testserver" % (user.id,)) self.assertEqual(hamlet['avatar_url'], "https://secure.gravatar.com/avatar/5106fb7f928d89e2f2ddb3c4c42d6949?d=identicon&version=1") self.assertNotIn('delivery_email', hamlet) # Also verify the /events code path. This is a bit hacky, but # basically we want to verify client_gravatar is being # overridden. with mock.patch('zerver.lib.events.request_event_queue', return_value=None) as mock_request_event_queue: with self.assertRaises(JsonableError): result = do_events_register(user, get_client("website"), client_gravatar=True) self.assertEqual(mock_request_event_queue.call_args_list[0][0][3], False) # client_gravatar is still turned off for admins. In theory, # it doesn't need to be, but client-side changes would be # required in apps like the mobile apps. # delivery_email is sent for admins. admin.refresh_from_db() self.login(admin.delivery_email) result = self.client_get('/json/users?client_gravatar=true') self.assert_json_success(result) members = result.json()['members'] hamlet = find_dict(members, 'user_id', user.id) self.assertEqual(hamlet['email'], "user%s@zulip.testserver" % (user.id,)) self.assertEqual(hamlet['avatar_url'], "https://secure.gravatar.com/avatar/5106fb7f928d89e2f2ddb3c4c42d6949?d=identicon&version=1") self.assertEqual(hamlet['delivery_email'], self.example_email("hamlet")) def test_user_cannot_promote_to_admin(self) -> None: self.login(self.example_email("hamlet")) req = dict(is_admin=ujson.dumps(True)) result = self.client_patch('/json/users/{}'.format(self.example_user('hamlet').id), req) self.assert_json_error(result, 'Insufficient permission') def test_admin_user_can_change_full_name(self) -> None: new_name = 'new name' self.login(self.example_email("iago")) hamlet = self.example_user('hamlet') req = dict(full_name=ujson.dumps(new_name)) result = self.client_patch('/json/users/{}'.format(hamlet.id), req) self.assert_json_success(result) hamlet = self.example_user('hamlet') self.assertEqual(hamlet.full_name, new_name) req['is_admin'] = ujson.dumps(False) result = self.client_patch('/json/users/{}'.format(hamlet.id), req) self.assert_json_success(result) def test_non_admin_cannot_change_full_name(self) -> None: self.login(self.example_email("hamlet")) req = dict(full_name=ujson.dumps('new name')) result = self.client_patch('/json/users/{}'.format(self.example_user('othello').id), req) self.assert_json_error(result, 'Insufficient permission') def test_admin_cannot_set_long_full_name(self) -> None: new_name = 'a' * (UserProfile.MAX_NAME_LENGTH + 1) self.login(self.example_email("iago")) req = dict(full_name=ujson.dumps(new_name)) result = self.client_patch('/json/users/{}'.format(self.example_user('hamlet').id), req) self.assert_json_error(result, 'Name too long!') def test_admin_cannot_set_short_full_name(self) -> None: new_name = 'a' self.login(self.example_email("iago")) req = dict(full_name=ujson.dumps(new_name)) result = self.client_patch('/json/users/{}'.format(self.example_user('hamlet').id), req) self.assert_json_error(result, 'Name too short!') def test_admin_cannot_set_full_name_with_invalid_characters(self) -> None: new_name = 'Opheli*' self.login(self.example_email("iago")) req = dict(full_name=ujson.dumps(new_name)) result = self.client_patch('/json/users/{}'.format(self.example_user('hamlet').id), req) self.assert_json_error(result, 'Invalid characters in name!') def test_access_user_by_id(self) -> None: iago = self.example_user("iago") # Must be a valid user ID in the realm with self.assertRaises(JsonableError): access_user_by_id(iago, 1234) with self.assertRaises(JsonableError): access_user_by_id(iago, self.mit_user("sipbtest").id) # Can only access bot users if allow_deactivated is passed bot = self.example_user("welcome_bot") access_user_by_id(iago, bot.id, allow_bots=True) with self.assertRaises(JsonableError): access_user_by_id(iago, bot.id) # Can only access deactivated users if allow_deactivated is passed hamlet = self.example_user("hamlet") do_deactivate_user(hamlet) with self.assertRaises(JsonableError): access_user_by_id(iago, hamlet.id) access_user_by_id(iago, hamlet.id, allow_deactivated=True) # Non-admin user can't admin another user with self.assertRaises(JsonableError): access_user_by_id(self.example_user("cordelia"), self.example_user("aaron").id) def test_change_regular_member_to_guest(self) -> None: iago = self.example_user("iago") self.login(iago.email) hamlet = self.example_user("hamlet") self.assertFalse(hamlet.is_guest) # Test failure of making user both admin and guest req = dict(is_guest=ujson.dumps(True), is_admin=ujson.dumps(True)) result = self.client_patch('/json/users/{}'.format(hamlet.id), req) self.assert_json_error(result, 'Guests cannot be organization administrators') self.assertFalse(hamlet.is_guest) self.assertFalse(hamlet.is_realm_admin) hamlet = self.example_user("hamlet") req = dict(is_guest=ujson.dumps(True)) events = [] # type: List[Mapping[str, Any]] with tornado_redirected_to_list(events): result = self.client_patch('/json/users/{}'.format(hamlet.id), req) self.assert_json_success(result) hamlet = self.example_user("hamlet") self.assertTrue(hamlet.is_guest) self.assertFalse(hamlet.can_access_all_realm_members()) person = events[0]['event']['person'] self.assertEqual(person['email'], hamlet.email) self.assertTrue(person['is_guest']) def test_change_guest_to_regular_member(self) -> None: iago = self.example_user("iago") self.login(iago.email) polonius = self.example_user("polonius") self.assertTrue(polonius.is_guest) req = dict(is_guest=ujson.dumps(False)) events = [] # type: List[Mapping[str, Any]] with tornado_redirected_to_list(events): result = self.client_patch('/json/users/{}'.format(polonius.id), req) self.assert_json_success(result) polonius = self.example_user("polonius") self.assertFalse(polonius.is_guest) person = events[0]['event']['person'] self.assertEqual(person['email'], polonius.email) self.assertFalse(person['is_guest']) def test_change_admin_to_guest(self) -> None: iago = self.example_user("iago") self.login(iago.email) hamlet = self.example_user("hamlet") do_change_is_admin(hamlet, True) self.assertFalse(hamlet.is_guest) self.assertTrue(hamlet.is_realm_admin) # Test failure of making a admin to guest without revoking admin status req = dict(is_guest=ujson.dumps(True)) result = self.client_patch('/json/users/{}'.format(hamlet.id), req) self.assert_json_error(result, 'Guests cannot be organization administrators') # Test changing a user from admin to guest and revoking admin status hamlet = self.example_user("hamlet") self.assertFalse(hamlet.is_guest) req = dict(is_admin=ujson.dumps(False), is_guest=ujson.dumps(True)) events = [] # type: List[Mapping[str, Any]] with tornado_redirected_to_list(events): result = self.client_patch('/json/users/{}'.format(hamlet.id), req) self.assert_json_success(result) hamlet = self.example_user("hamlet") self.assertTrue(hamlet.is_guest) self.assertFalse(hamlet.is_realm_admin) person = events[0]['event']['person'] self.assertEqual(person['email'], hamlet.email) self.assertFalse(person['is_admin']) person = events[1]['event']['person'] self.assertEqual(person['email'], hamlet.email) self.assertTrue(person['is_guest']) def test_change_guest_to_admin(self) -> None: iago = self.example_user("iago") self.login(iago.email) polonius = self.example_user("polonius") self.assertTrue(polonius.is_guest) self.assertFalse(polonius.is_realm_admin) # Test failure of making a guest to admin without revoking guest status req = dict(is_admin=ujson.dumps(True)) result = self.client_patch('/json/users/{}'.format(polonius.id), req) self.assert_json_error(result, 'Guests cannot be organization administrators') # Test changing a user from guest to admin and revoking guest status polonius = self.example_user("polonius") self.assertFalse(polonius.is_realm_admin) req = dict(is_admin=ujson.dumps(True), is_guest=ujson.dumps(False)) events = [] # type: List[Mapping[str, Any]] with tornado_redirected_to_list(events): result = self.client_patch('/json/users/{}'.format(polonius.id), req) self.assert_json_success(result) polonius = self.example_user("polonius") self.assertFalse(polonius.is_guest) self.assertTrue(polonius.is_realm_admin) person = events[0]['event']['person'] self.assertEqual(person['email'], polonius.email) self.assertTrue(person['is_admin']) person = events[1]['event']['person'] self.assertEqual(person['email'], polonius.email) self.assertFalse(person['is_guest']) def test_admin_user_can_change_profile_data(self) -> None: realm = get_realm('zulip') self.login(self.example_email("iago")) new_profile_data = [] cordelia = self.example_user("cordelia") # Test for all type of data fields = { 'Phone number': 'short text data', 'Biography': 'long text data', 'Favorite food': 'short text data', 'Favorite editor': 'vim', 'Birthday': '1909-3-5', 'GitHub profile': 'https://github.com/ABC', 'Mentor': [cordelia.id], } for field_name in fields: field = CustomProfileField.objects.get(name=field_name, realm=realm) new_profile_data.append({ 'id': field.id, 'value': fields[field_name], }) result = self.client_patch('/json/users/{}'.format(cordelia.id), {'profile_data': ujson.dumps(new_profile_data)}) self.assert_json_success(result) cordelia = self.example_user("cordelia") for field_dict in cordelia.profile_data: with self.subTest(field_name=field_dict['name']): self.assertEqual(field_dict['value'], fields[field_dict['name']]) # type: ignore # Reason in comment # Invalid index type for dict key, it must be str but field_dict values can be anything # Test admin user cannot set invalid profile data invalid_fields = [ ('Favorite editor', 'invalid choice', "'invalid choice' is not a valid choice for 'Favorite editor'."), ('Birthday', '1909-34-55', "Birthday is not a date"), ('GitHub profile', 'not url', "GitHub profile is not a URL"), ('Mentor', "not list of user ids", "User IDs is not a list"), ] for field_name, field_value, error_msg in invalid_fields: new_profile_data = [] field = CustomProfileField.objects.get(name=field_name, realm=realm) new_profile_data.append({ 'id': field.id, 'value': field_value, }) result = self.client_patch('/json/users/{}'.format(cordelia.id), {'profile_data': ujson.dumps(new_profile_data)}) self.assert_json_error(result, error_msg) # non-existant field and no data invalid_profile_data = [{ 'id': 9001, 'value': '' }] result = self.client_patch('/json/users/{}'.format(cordelia.id), {'profile_data': ujson.dumps(invalid_profile_data)}) self.assert_json_error(result, 'Field id 9001 not found.') # non-existant field and data invalid_profile_data = [{ 'id': 9001, 'value': 'some data' }] result = self.client_patch('/json/users/{}'.format(cordelia.id), {'profile_data': ujson.dumps(invalid_profile_data)}) self.assert_json_error(result, 'Field id 9001 not found.') # Test for clearing/resetting field values. empty_profile_data = [] for field_name in fields: field = CustomProfileField.objects.get(name=field_name, realm=realm) value = '' # type: Union[str, None, List[Any]] if field.field_type == CustomProfileField.USER: value = [] empty_profile_data.append({ 'id': field.id, 'value': value, }) result = self.client_patch('/json/users/{}'.format(cordelia.id), {'profile_data': ujson.dumps(empty_profile_data)}) self.assert_json_success(result) for field_dict in cordelia.profile_data: with self.subTest(field_name=field_dict['name']): self.assertEqual(field_dict['value'], None) # Test adding some of the field values after removing all. hamlet = self.example_user("hamlet") new_fields = { 'Phone number': None, 'Biography': 'A test user', 'Favorite food': None, 'Favorite editor': None, 'Birthday': None, 'GitHub profile': 'https://github.com/DEF', 'Mentor': [hamlet.id] } new_profile_data = [] for field_name in fields: field = CustomProfileField.objects.get(name=field_name, realm=realm) value = None if new_fields[field_name]: value = new_fields[field_name] new_profile_data.append({ 'id': field.id, 'value': value, }) result = self.client_patch('/json/users/{}'.format(cordelia.id), {'profile_data': ujson.dumps(new_profile_data)}) self.assert_json_success(result) for field_dict in cordelia.profile_data: with self.subTest(field_name=field_dict['name']): self.assertEqual(field_dict['value'], new_fields[str(field_dict['name'])]) def test_non_admin_user_cannot_change_profile_data(self) -> None: self.login(self.example_email("cordelia")) hamlet = self.example_user("hamlet") realm = get_realm("zulip") new_profile_data = [] field = CustomProfileField.objects.get(name="Biography", realm=realm) new_profile_data.append({ 'id': field.id, 'value': "New hamlet Biography", }) result = self.client_patch('/json/users/{}'.format(hamlet.id), {'profile_data': ujson.dumps(new_profile_data)}) self.assert_json_error(result, 'Insufficient permission') result = self.client_patch('/json/users/{}'.format(self.example_user("cordelia").id), {'profile_data': ujson.dumps(new_profile_data)}) self.assert_json_error(result, 'Insufficient permission') class AdminCreateUserTest(ZulipTestCase): def test_create_user_backend(self) -> None: # This test should give us complete coverage on # create_user_backend. It mostly exercises error # conditions, and it also does a basic test of the success # path. admin = self.example_user('hamlet') admin_email = admin.email realm = admin.realm self.login(admin_email) do_change_is_admin(admin, True) result = self.client_post("/json/users", dict()) self.assert_json_error(result, "Missing 'email' argument") result = self.client_post("/json/users", dict( email='romeo@not-zulip.com', )) self.assert_json_error(result, "Missing 'password' argument") result = self.client_post("/json/users", dict( email='romeo@not-zulip.com', password='xxxx', )) self.assert_json_error(result, "Missing 'full_name' argument") result = self.client_post("/json/users", dict( email='romeo@not-zulip.com', password='xxxx', full_name='Romeo Montague', )) self.assert_json_error(result, "Missing 'short_name' argument") result = self.client_post("/json/users", dict( email='broken', password='xxxx', full_name='Romeo Montague', short_name='Romeo', )) self.assert_json_error(result, "Bad name or username") result = self.client_post("/json/users", dict( email='romeo@not-zulip.com', password='xxxx', full_name='Romeo Montague', short_name='Romeo', )) self.assert_json_error(result, "Email 'romeo@not-zulip.com' not allowed in this organization") RealmDomain.objects.create(realm=get_realm('zulip'), domain='zulip.net') valid_params = dict( email='romeo@zulip.net', password='xxxx', full_name='Romeo Montague', short_name='Romeo', ) result = self.client_post("/json/users", valid_params) self.assert_json_success(result) # Romeo is a newly registered user new_user = get_user('romeo@zulip.net', get_realm('zulip')) self.assertEqual(new_user.full_name, 'Romeo Montague') self.assertEqual(new_user.short_name, 'Romeo') # we can't create the same user twice. result = self.client_post("/json/users", valid_params) self.assert_json_error(result, "Email 'romeo@zulip.net' already in use") # Don't allow user to sign up with disposable email. realm.emails_restricted_to_domains = False realm.disallow_disposable_email_addresses = True realm.save() valid_params["email"] = "abc@mailnator.com" result = self.client_post("/json/users", valid_params) self.assert_json_error(result, "Disposable email addresses are not allowed in this organization") # Don't allow creating a user with + in their email address when realm # is restricted to a domain. realm.emails_restricted_to_domains = True realm.save() valid_params["email"] = "iago+label@zulip.com" result = self.client_post("/json/users", valid_params) self.assert_json_error(result, "Email addresses containing + are not allowed.") # Users can be created with + in their email address when realm # is not restricted to a domain. realm.emails_restricted_to_domains = False realm.save() valid_params["email"] = "iago+label@zulip.com" result = self.client_post("/json/users", valid_params) self.assert_json_success(result) class UserProfileTest(ZulipTestCase): def test_get_emails_from_user_ids(self) -> None: hamlet = self.example_user('hamlet') othello = self.example_user('othello') dct = get_emails_from_user_ids([hamlet.id, othello.id]) self.assertEqual(dct[hamlet.id], self.example_email("hamlet")) self.assertEqual(dct[othello.id], self.example_email("othello")) def test_valid_user_id(self) -> None: realm = get_realm("zulip") hamlet = self.example_user('hamlet') othello = self.example_user('othello') bot = self.example_user("welcome_bot") # Invalid user ID invalid_uid = 1000 # type: Any self.assertEqual(check_valid_user_ids(realm.id, invalid_uid), "User IDs is not a list") self.assertEqual(check_valid_user_ids(realm.id, [invalid_uid]), "Invalid user ID: %d" % (invalid_uid,)) invalid_uid = "abc" self.assertEqual(check_valid_user_ids(realm.id, [invalid_uid]), "User IDs[0] is not an integer") invalid_uid = str(othello.id) self.assertEqual(check_valid_user_ids(realm.id, [invalid_uid]), "User IDs[0] is not an integer") # User is in different realm self.assertEqual(check_valid_user_ids(get_realm("zephyr").id, [hamlet.id]), "Invalid user ID: %d" % (hamlet.id,)) # User is not active hamlet.is_active = False hamlet.save() self.assertEqual(check_valid_user_ids(realm.id, [hamlet.id]), "User with ID %d is deactivated" % (hamlet.id,)) self.assertEqual(check_valid_user_ids(realm.id, [hamlet.id], allow_deactivated=True), None) # User is a bot self.assertEqual(check_valid_user_ids(realm.id, [bot.id]), "User with ID %d is a bot" % (bot.id,)) # Succesfully get non-bot, active user belong to your realm self.assertEqual(check_valid_user_ids(realm.id, [othello.id]), None) def test_cache_invalidation(self) -> None: hamlet = self.example_user('hamlet') with mock.patch('zerver.lib.cache.delete_display_recipient_cache') as m: hamlet.full_name = 'Hamlet Junior' hamlet.save(update_fields=["full_name"]) self.assertTrue(m.called) with mock.patch('zerver.lib.cache.delete_display_recipient_cache') as m: hamlet.long_term_idle = True hamlet.save(update_fields=["long_term_idle"]) self.assertFalse(m.called) def test_user_ids_to_users(self) -> None: real_user_ids = [ self.example_user('hamlet').id, self.example_user('cordelia').id, ] self.assertEqual(user_ids_to_users([], get_realm("zulip")), []) self.assertEqual(set([user_profile.id for user_profile in user_ids_to_users(real_user_ids, get_realm("zulip"))]), set(real_user_ids)) with self.assertRaises(JsonableError): user_ids_to_users([1234], get_realm("zephyr")) with self.assertRaises(JsonableError): user_ids_to_users(real_user_ids, get_realm("zephyr")) def test_bulk_get_users(self) -> None: from zerver.lib.users import bulk_get_users hamlet = self.example_email("hamlet") cordelia = self.example_email("cordelia") webhook_bot = self.example_email("webhook_bot") result = bulk_get_users([hamlet, cordelia], get_realm("zulip")) self.assertEqual(result[hamlet].email, hamlet) self.assertEqual(result[cordelia].email, cordelia) result = bulk_get_users([hamlet, cordelia, webhook_bot], None, base_query=UserProfile.objects.all()) self.assertEqual(result[hamlet].email, hamlet) self.assertEqual(result[cordelia].email, cordelia) self.assertEqual(result[webhook_bot].email, webhook_bot) def test_get_accounts_for_email(self) -> None: def check_account_present_in_accounts(user: UserProfile, accounts: List[Dict[str, Optional[str]]]) -> None: for account in accounts: realm = user.realm if account["avatar"] == avatar_url(user) and account["full_name"] == user.full_name \ and account["realm_name"] == realm.name and account["string_id"] == realm.string_id: return raise AssertionError("Account not found") lear_realm = get_realm("lear") cordelia_in_zulip = self.example_user("cordelia") cordelia_in_lear = get_user("cordelia@zulip.com", lear_realm) email = "cordelia@zulip.com" accounts = get_accounts_for_email(email) self.assert_length(accounts, 2) check_account_present_in_accounts(cordelia_in_zulip, accounts) check_account_present_in_accounts(cordelia_in_lear, accounts) email = "CORDelia@zulip.com" accounts = get_accounts_for_email(email) self.assert_length(accounts, 2) check_account_present_in_accounts(cordelia_in_zulip, accounts) check_account_present_in_accounts(cordelia_in_lear, accounts) email = "IAGO@ZULIP.COM" accounts = get_accounts_for_email(email) self.assert_length(accounts, 1) check_account_present_in_accounts(self.example_user("iago"), accounts) def test_get_source_profile(self) -> None: iago = get_source_profile("iago@zulip.com", "zulip") assert iago is not None self.assertEqual(iago.email, "iago@zulip.com") self.assertEqual(iago.realm, get_realm("zulip")) iago = get_source_profile("IAGO@ZULIP.com", "zulip") assert iago is not None self.assertEqual(iago.email, "iago@zulip.com") cordelia = get_source_profile("cordelia@zulip.com", "lear") assert cordelia is not None self.assertEqual(cordelia.email, "cordelia@zulip.com") self.assertIsNone(get_source_profile("iagod@zulip.com", "zulip")) self.assertIsNone(get_source_profile("iago@zulip.com", "ZULIP")) self.assertIsNone(get_source_profile("iago@zulip.com", "lear")) def test_copy_user_settings(self) -> None: iago = self.example_user("iago") cordelia = self.example_user("cordelia") hamlet = self.example_user("hamlet") cordelia.default_language = "de" cordelia.emojiset = "apple" cordelia.timezone = "America/Phoenix" cordelia.night_mode = True cordelia.enable_offline_email_notifications = False cordelia.enable_stream_push_notifications = True cordelia.enter_sends = False cordelia.save() UserHotspot.objects.filter(user=cordelia).delete() UserHotspot.objects.filter(user=iago).delete() hotspots_completed = ['intro_reply', 'intro_streams', 'intro_topics'] for hotspot in hotspots_completed: UserHotspot.objects.create(user=cordelia, hotspot=hotspot) copy_user_settings(cordelia, iago) # We verify that cordelia and iago match, but hamlet has the defaults. self.assertEqual(iago.full_name, "Cordelia Lear") self.assertEqual(cordelia.full_name, "Cordelia Lear") self.assertEqual(hamlet.full_name, "King Hamlet") self.assertEqual(iago.default_language, "de") self.assertEqual(cordelia.default_language, "de") self.assertEqual(hamlet.default_language, "en") self.assertEqual(iago.emojiset, "apple") self.assertEqual(cordelia.emojiset, "apple") self.assertEqual(hamlet.emojiset, "google-blob") self.assertEqual(iago.timezone, "America/Phoenix") self.assertEqual(cordelia.timezone, "America/Phoenix") self.assertEqual(hamlet.timezone, "") self.assertEqual(iago.night_mode, True) self.assertEqual(cordelia.night_mode, True) self.assertEqual(hamlet.night_mode, False) self.assertEqual(iago.enable_offline_email_notifications, False) self.assertEqual(cordelia.enable_offline_email_notifications, False) self.assertEqual(hamlet.enable_offline_email_notifications, True) self.assertEqual(iago.enable_stream_push_notifications, True) self.assertEqual(cordelia.enable_stream_push_notifications, True) self.assertEqual(hamlet.enable_stream_push_notifications, False) self.assertEqual(iago.enter_sends, False) self.assertEqual(cordelia.enter_sends, False) self.assertEqual(hamlet.enter_sends, True) hotspots = list(UserHotspot.objects.filter(user=iago).values_list('hotspot', flat=True)) self.assertEqual(hotspots, hotspots_completed) def test_get_user_by_id_in_realm_including_cross_realm(self) -> None: realm = get_realm('zulip') hamlet = self.example_user('hamlet') othello = self.example_user('othello') bot = self.example_user('welcome_bot') # Pass in the ID of a cross-realm bot and a valid realm cross_realm_bot = get_user_by_id_in_realm_including_cross_realm( bot.id, realm) self.assertEqual(cross_realm_bot.email, bot.email) self.assertEqual(cross_realm_bot.id, bot.id) # Pass in the ID of a cross-realm bot but with a invalid realm, # note that the realm should be irrelevant here cross_realm_bot = get_user_by_id_in_realm_including_cross_realm( bot.id, None) self.assertEqual(cross_realm_bot.email, bot.email) self.assertEqual(cross_realm_bot.id, bot.id) # Pass in the ID of a non-cross-realm user with a realm user_profile = get_user_by_id_in_realm_including_cross_realm( othello.id, realm) self.assertEqual(user_profile.email, othello.email) self.assertEqual(user_profile.id, othello.id) # If the realm doesn't match, or if the ID is not that of a # cross-realm bot, UserProfile.DoesNotExist is raised with self.assertRaises(UserProfile.DoesNotExist): get_user_by_id_in_realm_including_cross_realm( hamlet.id, None) class ActivateTest(ZulipTestCase): def test_basics(self) -> None: user = self.example_user('hamlet') do_deactivate_user(user) self.assertFalse(user.is_active) do_reactivate_user(user) self.assertTrue(user.is_active) def test_api(self) -> None: admin = self.example_user('othello') do_change_is_admin(admin, True) self.login(self.example_email("othello")) user = self.example_user('hamlet') self.assertTrue(user.is_active) result = self.client_delete('/json/users/{}'.format(user.id)) self.assert_json_success(result) user = self.example_user('hamlet') self.assertFalse(user.is_active) result = self.client_post('/json/users/{}/reactivate'.format(user.id)) self.assert_json_success(result) user = self.example_user('hamlet') self.assertTrue(user.is_active) def test_api_with_nonexistent_user(self) -> None: admin = self.example_user('othello') do_change_is_admin(admin, True) self.login(self.example_email("othello")) # Cannot deactivate a user with the bot api result = self.client_delete('/json/bots/{}'.format(self.example_user("hamlet").id)) self.assert_json_error(result, 'No such bot') # Cannot deactivate a nonexistent user. invalid_user_id = 1000 result = self.client_delete('/json/users/{}'.format(invalid_user_id)) self.assert_json_error(result, 'No such user') result = self.client_delete('/json/users/{}'.format(self.example_user("webhook_bot").id)) self.assert_json_error(result, 'No such user') result = self.client_delete('/json/users/{}'.format(self.example_user("iago").id)) self.assert_json_success(result) result = self.client_delete('/json/users/{}'.format(admin.id)) self.assert_json_error(result, 'Cannot deactivate the only organization administrator') # Cannot reactivate a nonexistent user. invalid_user_id = 1000 result = self.client_post('/json/users/{}/reactivate'.format(invalid_user_id)) self.assert_json_error(result, 'No such user') def test_api_with_insufficient_permissions(self) -> None: non_admin = self.example_user('othello') do_change_is_admin(non_admin, False) self.login(self.example_email("othello")) # Cannot deactivate a user with the users api result = self.client_delete('/json/users/{}'.format(self.example_user("hamlet").id)) self.assert_json_error(result, 'Insufficient permission') # Cannot reactivate a user result = self.client_post('/json/users/{}/reactivate'.format(self.example_user("hamlet").id)) self.assert_json_error(result, 'Insufficient permission') def test_clear_scheduled_jobs(self) -> None: user = self.example_user('hamlet') send_future_email('zerver/emails/followup_day1', user.realm, to_user_ids=[user.id], delay=datetime.timedelta(hours=1)) self.assertEqual(ScheduledEmail.objects.count(), 1) do_deactivate_user(user) self.assertEqual(ScheduledEmail.objects.count(), 0) def test_send_future_email_with_multiple_recipients(self) -> None: hamlet = self.example_user('hamlet') iago = self.example_user('iago') send_future_email('zerver/emails/followup_day1', iago.realm, to_user_ids=[hamlet.id, iago.id], delay=datetime.timedelta(hours=1)) self.assertEqual(ScheduledEmail.objects.filter(users__in=[hamlet, iago]).distinct().count(), 1) email = ScheduledEmail.objects.all().first() self.assertEqual(email.users.count(), 2) def test_clear_scheduled_emails_with_multiple_user_ids(self) -> None: hamlet = self.example_user('hamlet') iago = self.example_user('iago') send_future_email('zerver/emails/followup_day1', iago.realm, to_user_ids=[hamlet.id, iago.id], delay=datetime.timedelta(hours=1)) self.assertEqual(ScheduledEmail.objects.count(), 1) clear_scheduled_emails([hamlet.id, iago.id]) self.assertEqual(ScheduledEmail.objects.count(), 0) def test_clear_schedule_emails_with_one_user_id(self) -> None: hamlet = self.example_user('hamlet') iago = self.example_user('iago') send_future_email('zerver/emails/followup_day1', iago.realm, to_user_ids=[hamlet.id, iago.id], delay=datetime.timedelta(hours=1)) self.assertEqual(ScheduledEmail.objects.count(), 1) clear_scheduled_emails([hamlet.id]) self.assertEqual(ScheduledEmail.objects.count(), 1) self.assertEqual(ScheduledEmail.objects.filter(users=hamlet).count(), 0) self.assertEqual(ScheduledEmail.objects.filter(users=iago).count(), 1) def test_deliver_email(self) -> None: iago = self.example_user('iago') hamlet = self.example_user('hamlet') send_future_email('zerver/emails/followup_day1', iago.realm, to_user_ids=[hamlet.id, iago.id], delay=datetime.timedelta(hours=1)) self.assertEqual(ScheduledEmail.objects.count(), 1) email = ScheduledEmail.objects.all().first() deliver_email(email) from django.core.mail import outbox self.assertEqual(len(outbox), 1) for message in outbox: self.assertEqual(set([hamlet.delivery_email, iago.delivery_email]), set(message.to)) self.assertEqual(ScheduledEmail.objects.count(), 0) class RecipientInfoTest(ZulipTestCase): def test_stream_recipient_info(self) -> None: hamlet = self.example_user('hamlet') cordelia = self.example_user('cordelia') othello = self.example_user('othello') realm = hamlet.realm stream_name = 'Test Stream' topic_name = 'test topic' for user in [hamlet, cordelia, othello]: self.subscribe(user, stream_name) stream = get_stream(stream_name, realm) recipient = get_stream_recipient(stream.id) stream_topic = StreamTopicTarget( stream_id=stream.id, topic_name=topic_name, ) info = get_recipient_info( recipient=recipient, sender_id=hamlet.id, stream_topic=stream_topic, ) all_user_ids = {hamlet.id, cordelia.id, othello.id} expected_info = dict( active_user_ids=all_user_ids, push_notify_user_ids=set(), stream_push_user_ids=set(), stream_email_user_ids=set(), um_eligible_user_ids=all_user_ids, long_term_idle_user_ids=set(), default_bot_user_ids=set(), service_bot_tuples=[], ) self.assertEqual(info, expected_info) hamlet.enable_stream_push_notifications = True hamlet.save() info = get_recipient_info( recipient=recipient, sender_id=hamlet.id, stream_topic=stream_topic, ) self.assertEqual(info['stream_push_user_ids'], {hamlet.id}) sub = get_subscription(stream_name, hamlet) sub.push_notifications = False sub.save() info = get_recipient_info( recipient=recipient, sender_id=hamlet.id, stream_topic=stream_topic, ) self.assertEqual(info['stream_push_user_ids'], set()) hamlet.enable_stream_push_notifications = False hamlet.save() sub = get_subscription(stream_name, hamlet) sub.push_notifications = True sub.save() info = get_recipient_info( recipient=recipient, sender_id=hamlet.id, stream_topic=stream_topic, ) self.assertEqual(info['stream_push_user_ids'], {hamlet.id}) # Now mute Hamlet to omit him from stream_push_user_ids. add_topic_mute( user_profile=hamlet, stream_id=stream.id, recipient_id=recipient.id, topic_name=topic_name, ) info = get_recipient_info( recipient=recipient, sender_id=hamlet.id, stream_topic=stream_topic, ) self.assertEqual(info['stream_push_user_ids'], set()) # Add a service bot. service_bot = do_create_user( email='service-bot@zulip.com', password='', realm=realm, full_name='', short_name='', bot_type=UserProfile.EMBEDDED_BOT, ) info = get_recipient_info( recipient=recipient, sender_id=hamlet.id, stream_topic=stream_topic, possibly_mentioned_user_ids={service_bot.id} ) self.assertEqual(info['service_bot_tuples'], [ (service_bot.id, UserProfile.EMBEDDED_BOT), ]) # Add a normal bot. normal_bot = do_create_user( email='normal-bot@zulip.com', password='', realm=realm, full_name='', short_name='', bot_type=UserProfile.DEFAULT_BOT, ) info = get_recipient_info( recipient=recipient, sender_id=hamlet.id, stream_topic=stream_topic, possibly_mentioned_user_ids={service_bot.id, normal_bot.id} ) self.assertEqual(info['default_bot_user_ids'], {normal_bot.id}) def test_get_recipient_info_invalid_recipient_type(self) -> None: hamlet = self.example_user('hamlet') realm = hamlet.realm stream = get_stream('Rome', realm) stream_topic = StreamTopicTarget( stream_id=stream.id, topic_name='test topic', ) # Make sure get_recipient_info asserts on invalid recipient types with self.assertRaisesRegex(ValueError, 'Bad recipient type'): invalid_recipient = Recipient(type=999) # 999 is not a valid type get_recipient_info( recipient=invalid_recipient, sender_id=hamlet.id, stream_topic=stream_topic, ) class BulkUsersTest(ZulipTestCase): def test_client_gravatar_option(self) -> None: self.login(self.example_email('cordelia')) hamlet = self.example_user('hamlet') def get_hamlet_avatar(client_gravatar: bool) -> Optional[str]: data = dict(client_gravatar=ujson.dumps(client_gravatar)) result = self.client_get('/json/users', data) self.assert_json_success(result) rows = result.json()['members'] hamlet_data = [ row for row in rows if row['user_id'] == hamlet.id ][0] return hamlet_data['avatar_url'] self.assertEqual( get_hamlet_avatar(client_gravatar=True), None ) ''' The main purpose of this test is to make sure we return None for avatar_url when client_gravatar is set to True. And we do a sanity check for when it's False, but we leave it to other tests to validate the specific URL. ''' self.assertIn( 'gravatar.com', get_hamlet_avatar(client_gravatar=False), ) class GetProfileTest(ZulipTestCase): def common_update_pointer(self, email: str, pointer: int) -> None: self.login(email) result = self.client_post("/json/users/me/pointer", {"pointer": pointer}) self.assert_json_success(result) def common_get_profile(self, user_id: str) -> Dict[str, Any]: # Assumes all users are example users in realm 'zulip' user_profile = self.example_user(user_id) self.send_stream_message(user_profile.email, "Verona", "hello") result = self.api_get(user_profile.email, "/api/v1/users/me") max_id = most_recent_message(user_profile).id self.assert_json_success(result) json = result.json() self.assertIn("client_id", json) self.assertIn("max_message_id", json) self.assertIn("pointer", json) self.assertEqual(json["max_message_id"], max_id) return json def test_get_pointer(self) -> None: email = self.example_email("hamlet") self.login(email) result = self.client_get("/json/users/me/pointer") self.assert_json_success(result) self.assertIn("pointer", result.json()) def test_cache_behavior(self) -> None: """Tests whether fetching a user object the normal way, with `get_user`, makes 1 cache query and 1 database query. """ realm = get_realm("zulip") email = self.example_email("hamlet") with queries_captured() as queries: with simulated_empty_cache() as cache_queries: user_profile = get_user(email, realm) self.assert_length(queries, 1) self.assert_length(cache_queries, 1) self.assertEqual(user_profile.email, email) def test_get_user_profile(self) -> None: self.login(self.example_email("hamlet")) result = ujson.loads(self.client_get('/json/users/me').content) self.assertEqual(result['short_name'], 'hamlet') self.assertEqual(result['email'], self.example_email("hamlet")) self.assertEqual(result['full_name'], 'King Hamlet') self.assertIn("user_id", result) self.assertFalse(result['is_bot']) self.assertFalse(result['is_admin']) self.login(self.example_email("iago")) result = ujson.loads(self.client_get('/json/users/me').content) self.assertEqual(result['short_name'], 'iago') self.assertEqual(result['email'], self.example_email("iago")) self.assertEqual(result['full_name'], 'Iago') self.assertFalse(result['is_bot']) self.assertTrue(result['is_admin']) def test_api_get_empty_profile(self) -> None: """ Ensure GET /users/me returns a max message id and returns successfully """ json = self.common_get_profile("othello") self.assertEqual(json["pointer"], -1) def test_profile_with_pointer(self) -> None: """ Ensure GET /users/me returns a proper pointer id after the pointer is updated """ id1 = self.send_stream_message(self.example_email("othello"), "Verona") id2 = self.send_stream_message(self.example_email("othello"), "Verona") json = self.common_get_profile("hamlet") self.common_update_pointer(self.example_email("hamlet"), id2) json = self.common_get_profile("hamlet") self.assertEqual(json["pointer"], id2) self.common_update_pointer(self.example_email("hamlet"), id1) json = self.common_get_profile("hamlet") self.assertEqual(json["pointer"], id2) # pointer does not move backwards result = self.client_post("/json/users/me/pointer", {"pointer": 99999999}) self.assert_json_error(result, "Invalid message ID") def test_get_all_profiles_avatar_urls(self) -> None: user_profile = self.example_user('hamlet') result = self.api_get(self.example_email("hamlet"), "/api/v1/users") self.assert_json_success(result) for user in result.json()['members']: if user['email'] == self.example_email("hamlet"): self.assertEqual( user['avatar_url'], avatar_url(user_profile), )