# -*- coding: utf-8 -*- # See http://zulip.readthedocs.io/en/latest/events-system.html for # high-level documentation on how this system works. from __future__ import absolute_import from __future__ import print_function from typing import Any, Callable, Dict, List, Optional, Union, Text, Tuple import os import shutil from django.conf import settings from django.http import HttpRequest, HttpResponse from django.test import TestCase from django.utils.timezone import now as timezone_now from zerver.models import ( get_client, get_realm, get_recipient, get_stream, get_user, Message, RealmDomain, Recipient, UserMessage, UserPresence, UserProfile, Realm, ) from zerver.lib.actions import ( bulk_add_subscriptions, bulk_remove_subscriptions, do_add_alert_words, check_add_realm_emoji, check_send_typing_notification, notify_realm_custom_profile_fields, do_add_realm_filter, do_add_reaction, do_remove_reaction, do_change_avatar_fields, do_change_default_all_public_streams, do_change_default_events_register_stream, do_change_default_sending_stream, do_change_full_name, do_change_bot_owner, do_change_is_admin, do_change_stream_description, do_change_subscription_property, do_create_user, do_deactivate_stream, do_deactivate_user, do_mark_hotspot_as_read, do_reactivate_user, do_refer_friend, do_regenerate_api_key, do_remove_alert_words, do_remove_realm_emoji, do_remove_realm_filter, do_rename_stream, do_add_default_stream, do_remove_default_stream, do_set_muted_topics, do_set_realm_property, do_set_realm_authentication_methods, do_set_realm_message_editing, do_update_embedded_data, do_update_message, do_update_message_flags, do_update_muted_topic, do_update_pointer, do_update_user_presence, do_set_user_display_setting, do_change_notification_settings, do_add_realm_domain, do_change_realm_domain, do_remove_realm_domain, do_change_icon_source, log_event, do_delete_message, ) from zerver.lib.events import ( apply_events, fetch_initial_state_data, ) from zerver.lib.message import render_markdown from zerver.lib.test_helpers import POSTRequestMock, get_subscription from zerver.lib.test_classes import ( ZulipTestCase, ) from zerver.lib.validator import ( check_bool, check_dict, check_dict_only, check_float, check_int, check_list, check_string, equals, check_none_or, Validator ) from zerver.views.events_register import _default_all_public_streams, _default_narrow from zerver.tornado.event_queue import allocate_client_descriptor, EventQueue from zerver.tornado.views import get_events_backend from collections import OrderedDict import mock import time import ujson from six.moves import range class LogEventsTest(ZulipTestCase): def test_with_missing_event_log_dir_setting(self): # type: () -> None with self.settings(EVENT_LOG_DIR=None): log_event(None) def test_log_event_mkdir(self): # type: () -> None dir_name = 'var/test-log-dir' try: shutil.rmtree(dir_name) except OSError: # nocoverage # assume it doesn't exist already pass self.assertFalse(os.path.exists(dir_name)) with self.settings(EVENT_LOG_DIR=dir_name): event = {} # type: Dict[str, int] log_event(event) self.assertTrue(os.path.exists(dir_name)) class EventsEndpointTest(ZulipTestCase): def test_events_register_endpoint(self): # type: () -> None # This test is intended to get minimal coverage on the # events_register code paths email = self.example_email("hamlet") with mock.patch('zerver.views.events_register.do_events_register', return_value={}): result = self.client_post('/json/register', **self.api_auth(email)) self.assert_json_success(result) with mock.patch('zerver.lib.events.request_event_queue', return_value=None): result = self.client_post('/json/register', **self.api_auth(email)) self.assert_json_error(result, "Could not allocate event queue") with mock.patch('zerver.lib.events.request_event_queue', return_value='15:11'): with mock.patch('zerver.lib.events.get_user_events', return_value=[]): result = self.client_post('/json/register', dict(event_types=ujson.dumps(['pointer'])), **self.api_auth(email)) self.assert_json_success(result) result_dict = ujson.loads(result.content) self.assertEqual(result_dict['last_event_id'], -1) self.assertEqual(result_dict['queue_id'], '15:11') with mock.patch('zerver.lib.events.request_event_queue', return_value='15:12'): with mock.patch('zerver.lib.events.get_user_events', return_value=[{ 'id': 6, 'type': 'pointer', 'pointer': 15, }]): result = self.client_post('/json/register', dict(event_types=ujson.dumps(['pointer'])), **self.api_auth(email)) self.assert_json_success(result) result_dict = ujson.loads(result.content) self.assertEqual(result_dict['last_event_id'], 6) self.assertEqual(result_dict['pointer'], 15) self.assertEqual(result_dict['queue_id'], '15:12') # Now test with `fetch_event_types` not matching the event with mock.patch('zerver.lib.events.request_event_queue', return_value='15:13'): with mock.patch('zerver.lib.events.get_user_events', return_value=[{ 'id': 6, 'type': 'pointer', 'pointer': 15, }]): result = self.client_post('/json/register', dict(event_types=ujson.dumps(['pointer']), fetch_event_types=ujson.dumps(['message'])), **self.api_auth(email)) self.assert_json_success(result) result_dict = ujson.loads(result.content) self.assertEqual(result_dict['last_event_id'], 6) # Check that the message event types data is in there self.assertIn('max_message_id', result_dict) # Check that the pointer event types data is not in there self.assertNotIn('pointer', result_dict) self.assertEqual(result_dict['queue_id'], '15:13') # Now test with `fetch_event_types` matching the event with mock.patch('zerver.lib.events.request_event_queue', return_value='15:13'): with mock.patch('zerver.lib.events.get_user_events', return_value=[{ 'id': 6, 'type': 'pointer', 'pointer': 15, }]): result = self.client_post('/json/register', dict(fetch_event_types=ujson.dumps(['pointer']), event_types=ujson.dumps(['message'])), **self.api_auth(email)) self.assert_json_success(result) result_dict = ujson.loads(result.content) self.assertEqual(result_dict['last_event_id'], 6) # Check that we didn't fetch the messages data self.assertNotIn('max_message_id', result_dict) # Check that the pointer data is in there, and is correctly # updated (presering our atomicity guaranteed), though of # course any future pointer events won't be distributed self.assertIn('pointer', result_dict) self.assertEqual(result_dict['pointer'], 15) self.assertEqual(result_dict['queue_id'], '15:13') def test_tornado_endpoint(self): # type: () -> None # This test is mostly intended to get minimal coverage on # the /notify_tornado endpoint, so we can have 100% URL coverage, # but it does exercise a little bit of the codepath. post_data = dict( data=ujson.dumps( dict( event=dict( type='other' ), users=[self.example_user('hamlet').id], ), ), ) req = POSTRequestMock(post_data, user_profile=None) req.META['REMOTE_ADDR'] = '127.0.0.1' result = self.client_post_request('/notify_tornado', req) self.assert_json_error(result, 'Access denied', status_code=403) post_data['secret'] = settings.SHARED_SECRET req = POSTRequestMock(post_data, user_profile=None) req.META['REMOTE_ADDR'] = '127.0.0.1' result = self.client_post_request('/notify_tornado', req) self.assert_json_success(result) class GetEventsTest(ZulipTestCase): def tornado_call(self, view_func, user_profile, post_data): # type: (Callable[[HttpRequest, UserProfile], HttpResponse], UserProfile, Dict[str, Any]) -> HttpResponse request = POSTRequestMock(post_data, user_profile) return view_func(request, user_profile) def test_get_events(self): # type: () -> None user_profile = self.example_user('hamlet') email = user_profile.email recipient_user_profile = self.example_user('othello') recipient_email = recipient_user_profile.email self.login(email) result = self.tornado_call(get_events_backend, user_profile, {"apply_markdown": ujson.dumps(True), "event_types": ujson.dumps(["message"]), "user_client": "website", "dont_block": ujson.dumps(True), }) self.assert_json_success(result) queue_id = ujson.loads(result.content)["queue_id"] recipient_result = self.tornado_call(get_events_backend, recipient_user_profile, {"apply_markdown": ujson.dumps(True), "event_types": ujson.dumps(["message"]), "user_client": "website", "dont_block": ujson.dumps(True), }) self.assert_json_success(recipient_result) recipient_queue_id = ujson.loads(recipient_result.content)["queue_id"] result = self.tornado_call(get_events_backend, user_profile, {"queue_id": queue_id, "user_client": "website", "last_event_id": -1, "dont_block": ujson.dumps(True), }) events = ujson.loads(result.content)["events"] self.assert_json_success(result) self.assert_length(events, 0) local_id = 10.01 self.send_message(email, recipient_email, Recipient.PERSONAL, "hello", local_id=local_id, sender_queue_id=queue_id) result = self.tornado_call(get_events_backend, user_profile, {"queue_id": queue_id, "user_client": "website", "last_event_id": -1, "dont_block": ujson.dumps(True), }) events = ujson.loads(result.content)["events"] self.assert_json_success(result) self.assert_length(events, 1) self.assertEqual(events[0]["type"], "message") self.assertEqual(events[0]["message"]["sender_email"], email) self.assertEqual(events[0]["local_message_id"], local_id) self.assertEqual(events[0]["message"]["display_recipient"][0]["is_mirror_dummy"], False) self.assertEqual(events[0]["message"]["display_recipient"][1]["is_mirror_dummy"], False) last_event_id = events[0]["id"] local_id += 0.01 self.send_message(email, recipient_email, Recipient.PERSONAL, "hello", local_id=local_id, sender_queue_id=queue_id) result = self.tornado_call(get_events_backend, user_profile, {"queue_id": queue_id, "user_client": "website", "last_event_id": last_event_id, "dont_block": ujson.dumps(True), }) events = ujson.loads(result.content)["events"] self.assert_json_success(result) self.assert_length(events, 1) self.assertEqual(events[0]["type"], "message") self.assertEqual(events[0]["message"]["sender_email"], email) self.assertEqual(events[0]["local_message_id"], local_id) # Test that the received message in the receiver's event queue # exists and does not contain a local id recipient_result = self.tornado_call(get_events_backend, recipient_user_profile, {"queue_id": recipient_queue_id, "user_client": "website", "last_event_id": -1, "dont_block": ujson.dumps(True), }) recipient_events = ujson.loads(recipient_result.content)["events"] self.assert_json_success(recipient_result) self.assertEqual(len(recipient_events), 2) self.assertEqual(recipient_events[0]["type"], "message") self.assertEqual(recipient_events[0]["message"]["sender_email"], email) self.assertTrue("local_message_id" not in recipient_events[0]) self.assertEqual(recipient_events[1]["type"], "message") self.assertEqual(recipient_events[1]["message"]["sender_email"], email) self.assertTrue("local_message_id" not in recipient_events[1]) def test_get_events_narrow(self): # type: () -> None user_profile = self.example_user('hamlet') email = user_profile.email self.login(email) result = self.tornado_call(get_events_backend, user_profile, {"apply_markdown": ujson.dumps(True), "event_types": ujson.dumps(["message"]), "narrow": ujson.dumps([["stream", "denmark"]]), "user_client": "website", "dont_block": ujson.dumps(True), }) self.assert_json_success(result) queue_id = ujson.loads(result.content)["queue_id"] result = self.tornado_call(get_events_backend, user_profile, {"queue_id": queue_id, "user_client": "website", "last_event_id": -1, "dont_block": ujson.dumps(True), }) events = ujson.loads(result.content)["events"] self.assert_json_success(result) self.assert_length(events, 0) self.send_message(email, self.example_email("othello"), Recipient.PERSONAL, "hello") self.send_message(email, "Denmark", Recipient.STREAM, "hello") result = self.tornado_call(get_events_backend, user_profile, {"queue_id": queue_id, "user_client": "website", "last_event_id": -1, "dont_block": ujson.dumps(True), }) events = ujson.loads(result.content)["events"] self.assert_json_success(result) self.assert_length(events, 1) self.assertEqual(events[0]["type"], "message") self.assertEqual(events[0]["message"]["display_recipient"], "Denmark") class EventsRegisterTest(ZulipTestCase): def setUp(self): # type: () -> None super(EventsRegisterTest, self).setUp() self.user_profile = self.example_user('hamlet') def create_bot(self, email): # type: (str) -> UserProfile return do_create_user(email, '123', get_realm('zulip'), 'Test Bot', 'test', bot_type=UserProfile.DEFAULT_BOT, bot_owner=self.user_profile) def realm_bot_schema(self, field_name, check): # type: (str, Validator) -> Validator return self.check_events_dict([ ('type', equals('realm_bot')), ('op', equals('update')), ('bot', check_dict_only([ ('email', check_string), ('user_id', check_int), (field_name, check), ])), ]) def do_test(self, action, event_types=None, include_subscribers=True, state_change_expected=True, num_events=1): # type: (Callable[[], Any], Optional[List[str]], bool, bool, int) -> List[Dict[str, Any]] client = allocate_client_descriptor( dict(user_profile_id = self.user_profile.id, user_profile_email = self.user_profile.email, realm_id = self.user_profile.realm_id, event_types = event_types, client_type_name = "website", apply_markdown = True, all_public_streams = False, queue_timeout = 600, last_connection_time = time.time(), narrow = []) ) # hybrid_state = initial fetch state + re-applying events triggered by our action # normal_state = do action then fetch at the end (the "normal" code path) hybrid_state = fetch_initial_state_data(self.user_profile, event_types, "", include_subscribers=include_subscribers) action() events = client.event_queue.contents() self.assertTrue(len(events) == num_events) before = ujson.dumps(hybrid_state) apply_events(hybrid_state, events, self.user_profile, include_subscribers=include_subscribers) after = ujson.dumps(hybrid_state) if state_change_expected: if before == after: print(events) # nocoverage raise AssertionError('Test does not exercise enough code -- events do not change state.') else: if before != after: raise AssertionError('Test is invalid--state actually does change here.') normal_state = fetch_initial_state_data(self.user_profile, event_types, "", include_subscribers=include_subscribers) self.match_states(hybrid_state, normal_state) return events def assert_on_error(self, error): # type: (Optional[str]) -> None if error: raise AssertionError(error) def match_states(self, state1, state2): # type: (Dict[str, Any], Dict[str, Any]) -> None def normalize(state): # type: (Dict[str, Any]) -> None state['realm_users'] = {u['email']: u for u in state['realm_users']} for u in state['subscriptions']: if 'subscribers' in u: u['subscribers'].sort() state['subscriptions'] = {u['name']: u for u in state['subscriptions']} state['unsubscribed'] = {u['name']: u for u in state['unsubscribed']} if 'realm_bots' in state: state['realm_bots'] = {u['email']: u for u in state['realm_bots']} normalize(state1) normalize(state2) self.assertEqual(state1, state2) def check_events_dict(self, required_keys): # type: (List[Tuple[str, Validator]]) -> Validator required_keys.append(('id', check_int)) return check_dict_only(required_keys) def test_send_message_events(self): # type: () -> None schema_checker = self.check_events_dict([ ('type', equals('message')), ('flags', check_list(None)), ('message', self.check_events_dict([ ('avatar_url', check_string), ('client', check_string), ('content', check_string), ('content_type', equals('text/html')), ('display_recipient', check_string), ('is_mentioned', check_bool), ('reactions', check_list(None)), ('recipient_id', check_int), ('sender_realm_str', check_string), ('sender_email', check_string), ('sender_full_name', check_string), ('sender_id', check_int), ('sender_short_name', check_string), ('stream_id', check_int), ('subject', check_string), ('subject_links', check_list(None)), ('timestamp', check_int), ('type', check_string), ])), ]) events = self.do_test( lambda: self.send_message(self.example_email("hamlet"), "Verona", Recipient.STREAM, "hello"), ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) # Verify message editing schema_checker = self.check_events_dict([ ('type', equals('update_message')), ('flags', check_list(None)), ('content', check_string), ('edit_timestamp', check_int), ('flags', check_list(None)), ('message_id', check_int), ('message_ids', check_list(check_int)), ('orig_content', check_string), ('orig_rendered_content', check_string), ('orig_subject', check_string), ('prev_rendered_content_version', check_int), ('propagate_mode', check_string), ('rendered_content', check_string), ('sender', check_string), ('stream_id', check_int), ('subject', check_string), ('subject_links', check_list(None)), ('user_id', check_int), ]) message = Message.objects.order_by('-id')[0] topic = 'new_topic' propagate_mode = 'change_all' content = 'new content' rendered_content = render_markdown(message, content) events = self.do_test( lambda: do_update_message(self.user_profile, message, topic, propagate_mode, content, rendered_content), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) # Verify do_update_embedded_data schema_checker = self.check_events_dict([ ('type', equals('update_message')), ('flags', check_list(None)), ('content', check_string), ('flags', check_list(None)), ('message_id', check_int), ('message_ids', check_list(check_int)), ('rendered_content', check_string), ('sender', check_string), ]) events = self.do_test( lambda: do_update_embedded_data(self.user_profile, message, u"embed_content", "
embed_content
"), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_update_message_flags(self): # type: () -> None # Test message flag update events schema_checker = self.check_events_dict([ ('all', check_bool), ('type', equals('update_message_flags')), ('flag', check_string), ('messages', check_list(check_int)), ('operation', equals("add")), ]) message = self.send_message(self.example_email("cordelia"), self.example_email("hamlet"), Recipient.PERSONAL, "hello") user_profile = self.example_user('hamlet') events = self.do_test( lambda: do_update_message_flags(user_profile, 'add', 'starred', [message], False, None, None), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) schema_checker = self.check_events_dict([ ('all', check_bool), ('type', equals('update_message_flags')), ('flag', check_string), ('messages', check_list(check_int)), ('operation', equals("remove")), ]) events = self.do_test( lambda: do_update_message_flags(user_profile, 'remove', 'starred', [message], False, None, None), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_send_reaction(self): # type: () -> None schema_checker = self.check_events_dict([ ('type', equals('reaction')), ('op', equals('add')), ('message_id', check_int), ('emoji_name', check_string), ('user', check_dict_only([ ('email', check_string), ('full_name', check_string), ('user_id', check_int) ])), ]) message_id = self.send_message(self.example_email("hamlet"), "Verona", Recipient.STREAM, "hello") message = Message.objects.get(id=message_id) events = self.do_test( lambda: do_add_reaction( self.user_profile, message, "tada"), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_remove_reaction(self): # type: () -> None schema_checker = self.check_events_dict([ ('type', equals('reaction')), ('op', equals('remove')), ('message_id', check_int), ('emoji_name', check_string), ('user', check_dict_only([ ('email', check_string), ('full_name', check_string), ('user_id', check_int) ])), ]) message_id = self.send_message(self.example_email("hamlet"), "Verona", Recipient.STREAM, "hello") message = Message.objects.get(id=message_id) events = self.do_test( lambda: do_remove_reaction( self.user_profile, message, "tada"), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_typing_events(self): # type: () -> None schema_checker = self.check_events_dict([ ('type', equals('typing')), ('op', equals('start')), ('sender', check_dict_only([ ('email', check_string), ('user_id', check_int)])), ('recipients', check_list(check_dict_only([ ('email', check_string), ('user_id', check_int), ]))), ]) events = self.do_test( lambda: check_send_typing_notification( self.user_profile, [self.example_email("cordelia")], "start"), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_custom_profile_fields_events(self): # type: () -> None schema_checker = self.check_events_dict([ ('type', equals('custom_profile_fields')), ('fields', check_list(check_dict_only([ ('type', check_int), ('name', check_string), ]))), ]) events = self.do_test( lambda: notify_realm_custom_profile_fields( self.user_profile.realm), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_presence_events(self): # type: () -> None schema_checker = self.check_events_dict([ ('type', equals('presence')), ('email', check_string), ('server_timestamp', check_float), ('presence', check_dict_only([ ('website', check_dict_only([ ('status', equals('active')), ('timestamp', check_int), ('client', check_string), ('pushable', check_bool), ])), ])), ]) events = self.do_test(lambda: do_update_user_presence( self.user_profile, get_client("website"), timezone_now(), UserPresence.ACTIVE)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_presence_events_multiple_clients(self): # type: () -> None schema_checker_android = self.check_events_dict([ ('type', equals('presence')), ('email', check_string), ('server_timestamp', check_float), ('presence', check_dict_only([ ('ZulipAndroid/1.0', check_dict_only([ ('status', equals('idle')), ('timestamp', check_int), ('client', check_string), ('pushable', check_bool), ])), ])), ]) self.client_post("/api/v1/users/me/presence", {'status': 'idle'}, HTTP_USER_AGENT="ZulipAndroid/1.0", **self.api_auth(self.user_profile.email)) self.do_test(lambda: do_update_user_presence( self.user_profile, get_client("website"), timezone_now(), UserPresence.ACTIVE)) events = self.do_test(lambda: do_update_user_presence( self.user_profile, get_client("ZulipAndroid/1.0"), timezone_now(), UserPresence.IDLE)) error = schema_checker_android('events[0]', events[0]) self.assert_on_error(error) def test_pointer_events(self): # type: () -> None schema_checker = self.check_events_dict([ ('type', equals('pointer')), ('pointer', check_int) ]) events = self.do_test(lambda: do_update_pointer(self.user_profile, 1500)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_referral_events(self): # type: () -> None schema_checker = self.check_events_dict([ ('type', equals('referral')), ('referrals', check_dict_only([ ('granted', check_int), ('used', check_int), ])), ]) events = self.do_test(lambda: do_refer_friend(self.user_profile, "friend@example.com")) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_register_events(self): # type: () -> None realm_user_add_checker = self.check_events_dict([ ('type', equals('realm_user')), ('op', equals('add')), ('person', check_dict_only([ ('user_id', check_int), ('email', check_string), ('avatar_url', check_string), ('full_name', check_string), ('is_admin', check_bool), ('is_bot', check_bool), ('timezone', check_string), ])), ]) events = self.do_test(lambda: self.register("test1@zulip.com", "test1")) self.assert_length(events, 1) error = realm_user_add_checker('events[0]', events[0]) self.assert_on_error(error) def test_alert_words_events(self): # type: () -> None alert_words_checker = self.check_events_dict([ ('type', equals('alert_words')), ('alert_words', check_list(check_string)), ]) events = self.do_test(lambda: do_add_alert_words(self.user_profile, ["alert_word"])) error = alert_words_checker('events[0]', events[0]) self.assert_on_error(error) events = self.do_test(lambda: do_remove_alert_words(self.user_profile, ["alert_word"])) error = alert_words_checker('events[0]', events[0]) self.assert_on_error(error) def test_default_streams_events(self): # type: () -> None default_streams_checker = self.check_events_dict([ ('type', equals('default_streams')), ('default_streams', check_list(check_dict_only([ ('description', check_string), ('invite_only', check_bool), ('name', check_string), ('stream_id', check_int), ]))), ]) stream = get_stream("Scotland", self.user_profile.realm) events = self.do_test(lambda: do_add_default_stream(stream)) error = default_streams_checker('events[0]', events[0]) events = self.do_test(lambda: do_remove_default_stream(stream)) error = default_streams_checker('events[0]', events[0]) self.assert_on_error(error) def test_muted_topics_events(self): # type: () -> None muted_topics_checker = self.check_events_dict([ ('type', equals('muted_topics')), ('muted_topics', check_list(check_list(check_string, 2))), ]) events = self.do_test(lambda: do_set_muted_topics(self.user_profile, [[u"Denmark", u"topic"]])) error = muted_topics_checker('events[0]', events[0]) self.assert_on_error(error) events = self.do_test(lambda: do_update_muted_topic( self.user_profile, "Denmark", "topic", "add")) error = muted_topics_checker('events[0]', events[0]) self.assert_on_error(error) events = self.do_test(lambda: do_update_muted_topic( self.user_profile, "Denmark", "topic", "remove")) error = muted_topics_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_avatar_fields(self): # type: () -> None schema_checker = self.check_events_dict([ ('type', equals('realm_user')), ('op', equals('update')), ('person', check_dict_only([ ('email', check_string), ('user_id', check_int), ('avatar_url', check_string), ])), ]) events = self.do_test( lambda: do_change_avatar_fields(self.user_profile, UserProfile.AVATAR_FROM_USER), ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_full_name(self): # type: () -> None schema_checker = self.check_events_dict([ ('type', equals('realm_user')), ('op', equals('update')), ('person', check_dict_only([ ('email', check_string), ('full_name', check_string), ('user_id', check_int), ])), ]) events = self.do_test(lambda: do_change_full_name(self.user_profile, 'Sir Hamlet', self.user_profile)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def do_set_realm_property_test(self, name): # type: (str) -> None bool_tests = [True, False, True] # type: List[bool] test_values = dict( add_emoji_by_admins_only=bool_tests, create_stream_by_admins_only=bool_tests, default_language=[u'es', u'de', u'en'], description=[u'Realm description', u'New description'], email_changes_disabled=bool_tests, invite_required=bool_tests, invite_by_admins_only=bool_tests, inline_image_preview=bool_tests, inline_url_embed_preview=bool_tests, message_retention_days=[10, 20], name=[u'Zulip', u'New Name'], name_changes_disabled=bool_tests, restricted_to_domain=bool_tests, waiting_period_threshold=[10, 20], ) # type: Dict[str, Any] property_type = Realm.property_types[name] if property_type is bool: validator = check_bool elif property_type is Text: validator = check_string elif property_type is int: validator = check_int elif property_type == (int, type(None)): validator = check_int else: raise AssertionError("Unexpected property type %s" % (property_type,)) schema_checker = self.check_events_dict([ ('type', equals('realm')), ('op', equals('update')), ('property', equals(name)), ('value', validator), ]) vals = test_values.get(name) if vals is None: raise AssertionError('No test created for %s' % (name)) do_set_realm_property(self.user_profile.realm, name, vals[0]) for val in vals[1:]: events = self.do_test( lambda: do_set_realm_property(self.user_profile.realm, name, val)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_property(self): # type: () -> None for prop in Realm.property_types: self.do_set_realm_property_test(prop) def test_change_realm_authentication_methods(self): # type: () -> None schema_checker = self.check_events_dict([ ('type', equals('realm')), ('op', equals('update_dict')), ('property', equals('default')), ('data', check_dict_only([ ('authentication_methods', check_dict([])) ])), ]) def fake_backends(): # type: () -> Any backends = ( 'zproject.backends.DevAuthBackend', 'zproject.backends.EmailAuthBackend', 'zproject.backends.GitHubAuthBackend', 'zproject.backends.GoogleMobileOauth2Backend', 'zproject.backends.ZulipLDAPAuthBackend', ) return self.settings(AUTHENTICATION_BACKENDS=backends) # Test transitions; any new backends should be tested with T/T/T/F/T for (auth_method_dict) in \ ({'Google': True, 'Email': True, 'GitHub': True, 'LDAP': False, 'Dev': False}, {'Google': True, 'Email': True, 'GitHub': False, 'LDAP': False, 'Dev': False}, {'Google': True, 'Email': False, 'GitHub': False, 'LDAP': False, 'Dev': False}, {'Google': True, 'Email': False, 'GitHub': True, 'LDAP': False, 'Dev': False}, {'Google': False, 'Email': False, 'GitHub': False, 'LDAP': False, 'Dev': True}, {'Google': False, 'Email': False, 'GitHub': True, 'LDAP': False, 'Dev': True}, {'Google': False, 'Email': True, 'GitHub': True, 'LDAP': True, 'Dev': False}): with fake_backends(): events = self.do_test( lambda: do_set_realm_authentication_methods( self.user_profile.realm, auth_method_dict)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_pin_stream(self): # type: () -> None schema_checker = self.check_events_dict([ ('type', equals('subscription')), ('op', equals('update')), ('property', equals('pin_to_top')), ('stream_id', check_int), ('value', check_bool), ('name', check_string), ('email', check_string), ]) stream = get_stream("Denmark", self.user_profile.realm) sub = get_subscription(stream.name, self.user_profile) do_change_subscription_property(self.user_profile, sub, stream, "pin_to_top", False) for pinned in (True, False): events = self.do_test(lambda: do_change_subscription_property(self.user_profile, sub, stream, "pin_to_top", pinned)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_message_edit_settings(self): # type: () -> None schema_checker = self.check_events_dict([ ('type', equals('realm')), ('op', equals('update_dict')), ('property', equals('default')), ('data', check_dict_only([ ('allow_message_editing', check_bool), ('message_content_edit_limit_seconds', check_int), ])), ]) # Test every transition among the four possibilities {T,F} x {0, non-0} for (allow_message_editing, message_content_edit_limit_seconds) in \ ((True, 0), (False, 0), (True, 0), (False, 1234), (True, 0), (True, 1234), (True, 0), (False, 0), (False, 1234), (False, 0), (True, 1234), (False, 0), (True, 1234), (True, 600), (False, 600), (False, 1234), (True, 600)): events = self.do_test( lambda: do_set_realm_message_editing(self.user_profile.realm, allow_message_editing, message_content_edit_limit_seconds)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_is_admin(self): # type: () -> None schema_checker = self.check_events_dict([ ('type', equals('realm_user')), ('op', equals('update')), ('person', check_dict_only([ ('email', check_string), ('is_admin', check_bool), ('user_id', check_int), ])), ]) do_change_is_admin(self.user_profile, False) for is_admin in [True, False]: events = self.do_test(lambda: do_change_is_admin(self.user_profile, is_admin)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def do_set_user_display_settings_test(self, setting_name, values_list): # type: (str, List[Union[bool, Text]]) -> None property_type = UserProfile.property_types[setting_name] if property_type is bool: validator = check_bool elif property_type is Text: validator = check_string else: raise AssertionError("Unexpected property type %s" % (property_type,)) num_events = 1 if setting_name == "timezone": num_events = 2 if property_type == bool: do_set_user_display_setting(self.user_profile, setting_name, False) for value in values_list: events = self.do_test(lambda: do_set_user_display_setting( self.user_profile, setting_name, value), num_events=num_events) schema_checker = self.check_events_dict([ ('type', equals('update_display_settings')), ('setting_name', equals(setting_name)), ('user', check_string), ('setting', validator), ]) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) timezone_schema_checker = self.check_events_dict([ ('type', equals('realm_user')), ('op', equals('update')), ('person', check_dict_only([ ('email', check_string), ('user_id', check_int), ('timezone', check_string), ])), ]) if setting_name == "timezone": error = timezone_schema_checker('events[1]', events[1]) def test_change_twenty_four_hour_time(self): # type: () -> None self.do_set_user_display_settings_test("twenty_four_hour_time", [True, False]) def test_change_left_side_userlist(self): # type: () -> None self.do_set_user_display_settings_test("left_side_userlist", [True, False]) def test_change_emoji_alt_code(self): # type: () -> None self.do_set_user_display_settings_test("emoji_alt_code", [True, False]) def test_change_emojiset(self): # type: () -> None self.do_set_user_display_settings_test("emojiset", [u'apple', u'twitter']) def test_change_default_language(self): # type: () -> None self.do_set_user_display_settings_test("default_language", [u'de', u'es', u'en']) def test_change_timezone(self): # type: () -> None self.do_set_user_display_settings_test("timezone", [u'US/Mountain', u'US/Samoa', u'Pacific/Galapagos', u'']) def test_change_notification_settings(self): # type: () -> None for notification_setting, v in self.user_profile.notification_setting_types.items(): schema_checker = self.check_events_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals(notification_setting)), ('user', check_string), ('setting', check_bool), ]) do_change_notification_settings(self.user_profile, notification_setting, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_notification_settings( self.user_profile, notification_setting, setting_value, log=False)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_realm_emoji_events(self): # type: () -> None schema_checker = self.check_events_dict([ ('type', equals('realm_emoji')), ('op', equals('update')), ('realm_emoji', check_dict([])), ]) events = self.do_test(lambda: check_add_realm_emoji(get_realm("zulip"), "my_emoji", "https://realm.com/my_emoji")) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) events = self.do_test(lambda: do_remove_realm_emoji(get_realm("zulip"), "my_emoji")) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_realm_filter_events(self): # type: () -> None schema_checker = self.check_events_dict([ ('type', equals('realm_filters')), ('realm_filters', check_list(None)), # TODO: validate tuples in the list ]) events = self.do_test(lambda: do_add_realm_filter(get_realm("zulip"), "#(?P