# -*- coding: utf-8 -*- # See http://zulip.readthedocs.io/en/latest/events-system.html for # high-level documentation on how this system works. from __future__ import absolute_import from __future__ import print_function from typing import Any, Callable, Dict, List, Optional, Union, Text from django.conf import settings from django.http import HttpRequest, HttpResponse from django.test import TestCase from django.utils.timezone import now as timezone_now from zerver.models import ( get_client, get_realm, get_recipient, get_stream, get_user_profile_by_email, Message, RealmDomain, Recipient, UserMessage, UserPresence, UserProfile, Realm, ) from zerver.lib.actions import ( bulk_add_subscriptions, bulk_remove_subscriptions, do_add_alert_words, check_add_realm_emoji, check_send_typing_notification, notify_realm_custom_profile_fields, do_add_realm_filter, do_add_reaction, do_remove_reaction, do_change_avatar_fields, do_change_default_all_public_streams, do_change_default_events_register_stream, do_change_default_sending_stream, do_change_full_name, do_change_bot_owner, do_change_is_admin, do_change_stream_description, do_change_subscription_property, do_create_user, do_deactivate_stream, do_deactivate_user, do_mark_hotspot_as_read, do_reactivate_user, do_refer_friend, do_regenerate_api_key, do_remove_alert_words, do_remove_realm_emoji, do_remove_realm_filter, do_rename_stream, do_add_default_stream, do_remove_default_stream, do_set_muted_topics, do_set_realm_property, do_set_realm_authentication_methods, do_set_realm_message_editing, do_update_embedded_data, do_update_message, do_update_message_flags, do_update_muted_topic, do_update_pointer, do_update_user_presence, do_set_user_display_setting, do_change_enable_stream_desktop_notifications, do_change_enable_stream_sounds, do_change_enable_desktop_notifications, do_change_enable_sounds, do_change_enable_offline_email_notifications, do_change_enable_offline_push_notifications, do_change_enable_online_push_notifications, do_change_pm_content_in_desktop_notifications, do_change_enable_digest_emails, do_add_realm_domain, do_change_realm_domain, do_remove_realm_domain, do_change_icon_source, ) from zerver.lib.events import ( apply_events, fetch_initial_state_data, ) from zerver.lib.message import render_markdown from zerver.lib.test_helpers import POSTRequestMock, get_subscription from zerver.lib.test_classes import ( ZulipTestCase, ) from zerver.lib.validator import ( check_bool, check_dict, check_dict_only, check_float, check_int, check_list, check_string, equals, check_none_or, Validator ) from zerver.views.events_register import _default_all_public_streams, _default_narrow from zerver.tornado.event_queue import allocate_client_descriptor, EventQueue from zerver.tornado.views import get_events_backend from collections import OrderedDict import mock import time import ujson from six.moves import range class EventsEndpointTest(ZulipTestCase): def test_events_register_endpoint(self): # type: () -> None # This test is intended to get minimal coverage on the # events_register code paths email = 'hamlet@zulip.com' with mock.patch('zerver.views.events_register.do_events_register', return_value={}): result = self.client_post('/json/register', **self.api_auth(email)) self.assert_json_success(result) with mock.patch('zerver.lib.events.request_event_queue', return_value=None): result = self.client_post('/json/register', **self.api_auth(email)) self.assert_json_error(result, "Could not allocate event queue") with mock.patch('zerver.lib.events.request_event_queue', return_value='15:11'): with mock.patch('zerver.lib.events.get_user_events', return_value=[]): result = self.client_post('/json/register', dict(event_types=ujson.dumps(['pointer'])), **self.api_auth(email)) self.assert_json_success(result) result_dict = ujson.loads(result.content) self.assertEqual(result_dict['last_event_id'], -1) self.assertEqual(result_dict['queue_id'], '15:11') with mock.patch('zerver.lib.events.request_event_queue', return_value='15:12'): with mock.patch('zerver.lib.events.get_user_events', return_value=[{ 'id': 6, 'type': 'pointer', 'pointer': 15, }]): result = self.client_post('/json/register', dict(event_types=ujson.dumps(['pointer'])), **self.api_auth(email)) self.assert_json_success(result) result_dict = ujson.loads(result.content) self.assertEqual(result_dict['last_event_id'], 6) self.assertEqual(result_dict['pointer'], 15) self.assertEqual(result_dict['queue_id'], '15:12') def test_tornado_endpoint(self): # type: () -> None # This test is mostly intended to get minimal coverage on # the /notify_tornado endpoint, so we can have 100% URL coverage, # but it does exercise a little bit of the codepath. post_data = dict( data=ujson.dumps( dict( event=dict( type='other' ), users=[get_user_profile_by_email('hamlet@zulip.com').id], ), ), ) req = POSTRequestMock(post_data, user_profile=None) req.META['REMOTE_ADDR'] = '127.0.0.1' result = self.client_post_request('/notify_tornado', req) self.assert_json_error(result, 'Access denied', status_code=403) post_data['secret'] = settings.SHARED_SECRET req = POSTRequestMock(post_data, user_profile=None) req.META['REMOTE_ADDR'] = '127.0.0.1' result = self.client_post_request('/notify_tornado', req) self.assert_json_success(result) class GetEventsTest(ZulipTestCase): def tornado_call(self, view_func, user_profile, post_data): # type: (Callable[[HttpRequest, UserProfile], HttpResponse], UserProfile, Dict[str, Any]) -> HttpResponse request = POSTRequestMock(post_data, user_profile) return view_func(request, user_profile) def test_get_events(self): # type: () -> None email = "hamlet@zulip.com" recipient_email = "othello@zulip.com" user_profile = get_user_profile_by_email(email) recipient_user_profile = get_user_profile_by_email(recipient_email) self.login(email) result = self.tornado_call(get_events_backend, user_profile, {"apply_markdown": ujson.dumps(True), "event_types": ujson.dumps(["message"]), "user_client": "website", "dont_block": ujson.dumps(True), }) self.assert_json_success(result) queue_id = ujson.loads(result.content)["queue_id"] recipient_result = self.tornado_call(get_events_backend, recipient_user_profile, {"apply_markdown": ujson.dumps(True), "event_types": ujson.dumps(["message"]), "user_client": "website", "dont_block": ujson.dumps(True), }) self.assert_json_success(recipient_result) recipient_queue_id = ujson.loads(recipient_result.content)["queue_id"] result = self.tornado_call(get_events_backend, user_profile, {"queue_id": queue_id, "user_client": "website", "last_event_id": -1, "dont_block": ujson.dumps(True), }) events = ujson.loads(result.content)["events"] self.assert_json_success(result) self.assert_length(events, 0) local_id = 10.01 self.send_message(email, recipient_email, Recipient.PERSONAL, "hello", local_id=local_id, sender_queue_id=queue_id) result = self.tornado_call(get_events_backend, user_profile, {"queue_id": queue_id, "user_client": "website", "last_event_id": -1, "dont_block": ujson.dumps(True), }) events = ujson.loads(result.content)["events"] self.assert_json_success(result) self.assert_length(events, 1) self.assertEqual(events[0]["type"], "message") self.assertEqual(events[0]["message"]["sender_email"], email) self.assertEqual(events[0]["local_message_id"], local_id) self.assertEqual(events[0]["message"]["display_recipient"][0]["is_mirror_dummy"], False) self.assertEqual(events[0]["message"]["display_recipient"][1]["is_mirror_dummy"], False) last_event_id = events[0]["id"] local_id += 0.01 self.send_message(email, recipient_email, Recipient.PERSONAL, "hello", local_id=local_id, sender_queue_id=queue_id) result = self.tornado_call(get_events_backend, user_profile, {"queue_id": queue_id, "user_client": "website", "last_event_id": last_event_id, "dont_block": ujson.dumps(True), }) events = ujson.loads(result.content)["events"] self.assert_json_success(result) self.assert_length(events, 1) self.assertEqual(events[0]["type"], "message") self.assertEqual(events[0]["message"]["sender_email"], email) self.assertEqual(events[0]["local_message_id"], local_id) # Test that the received message in the receiver's event queue # exists and does not contain a local id recipient_result = self.tornado_call(get_events_backend, recipient_user_profile, {"queue_id": recipient_queue_id, "user_client": "website", "last_event_id": -1, "dont_block": ujson.dumps(True), }) recipient_events = ujson.loads(recipient_result.content)["events"] self.assert_json_success(recipient_result) self.assertEqual(len(recipient_events), 2) self.assertEqual(recipient_events[0]["type"], "message") self.assertEqual(recipient_events[0]["message"]["sender_email"], email) self.assertTrue("local_message_id" not in recipient_events[0]) self.assertEqual(recipient_events[1]["type"], "message") self.assertEqual(recipient_events[1]["message"]["sender_email"], email) self.assertTrue("local_message_id" not in recipient_events[1]) def test_get_events_narrow(self): # type: () -> None email = "hamlet@zulip.com" user_profile = get_user_profile_by_email(email) self.login(email) result = self.tornado_call(get_events_backend, user_profile, {"apply_markdown": ujson.dumps(True), "event_types": ujson.dumps(["message"]), "narrow": ujson.dumps([["stream", "denmark"]]), "user_client": "website", "dont_block": ujson.dumps(True), }) self.assert_json_success(result) queue_id = ujson.loads(result.content)["queue_id"] result = self.tornado_call(get_events_backend, user_profile, {"queue_id": queue_id, "user_client": "website", "last_event_id": -1, "dont_block": ujson.dumps(True), }) events = ujson.loads(result.content)["events"] self.assert_json_success(result) self.assert_length(events, 0) self.send_message(email, "othello@zulip.com", Recipient.PERSONAL, "hello") self.send_message(email, "Denmark", Recipient.STREAM, "hello") result = self.tornado_call(get_events_backend, user_profile, {"queue_id": queue_id, "user_client": "website", "last_event_id": -1, "dont_block": ujson.dumps(True), }) events = ujson.loads(result.content)["events"] self.assert_json_success(result) self.assert_length(events, 1) self.assertEqual(events[0]["type"], "message") self.assertEqual(events[0]["message"]["display_recipient"], "Denmark") class EventsRegisterTest(ZulipTestCase): user_profile = get_user_profile_by_email("hamlet@zulip.com") maxDiff = None # type: Optional[int] def create_bot(self, email): # type: (str) -> UserProfile return do_create_user(email, '123', get_realm('zulip'), 'Test Bot', 'test', bot_type=UserProfile.DEFAULT_BOT, bot_owner=self.user_profile) def realm_bot_schema(self, field_name, check): # type: (str, Validator) -> Validator return check_dict_only([ ('id', check_int), ('type', equals('realm_bot')), ('op', equals('update')), ('bot', check_dict_only([ ('email', check_string), ('user_id', check_int), (field_name, check), ])), ]) def do_test(self, action, event_types=None, include_subscribers=True, state_change_expected=True, num_events=1): # type: (Callable[[], Any], Optional[List[str]], bool, bool, int) -> List[Dict[str, Any]] client = allocate_client_descriptor( dict(user_profile_id = self.user_profile.id, user_profile_email = self.user_profile.email, realm_id = self.user_profile.realm_id, event_types = event_types, client_type_name = "website", apply_markdown = True, all_public_streams = False, queue_timeout = 600, last_connection_time = time.time(), narrow = []) ) # hybrid_state = initial fetch state + re-applying events triggered by our action # normal_state = do action then fetch at the end (the "normal" code path) hybrid_state = fetch_initial_state_data(self.user_profile, event_types, "", include_subscribers=include_subscribers) action() events = client.event_queue.contents() self.assertTrue(len(events) == num_events) before = ujson.dumps(hybrid_state) apply_events(hybrid_state, events, self.user_profile, include_subscribers=include_subscribers) after = ujson.dumps(hybrid_state) if state_change_expected: if before == after: print(events) # nocoverage raise AssertionError('Test does not exercise enough code -- events do not change state.') else: if before != after: raise AssertionError('Test is invalid--state actually does change here.') normal_state = fetch_initial_state_data(self.user_profile, event_types, "", include_subscribers=include_subscribers) self.match_states(hybrid_state, normal_state) return events def assert_on_error(self, error): # type: (Optional[str]) -> None if error: raise AssertionError(error) def match_states(self, state1, state2): # type: (Dict[str, Any], Dict[str, Any]) -> None def normalize(state): # type: (Dict[str, Any]) -> None state['realm_users'] = {u['email']: u for u in state['realm_users']} for u in state['subscriptions']: if 'subscribers' in u: u['subscribers'].sort() state['subscriptions'] = {u['name']: u for u in state['subscriptions']} state['unsubscribed'] = {u['name']: u for u in state['unsubscribed']} if 'realm_bots' in state: state['realm_bots'] = {u['email']: u for u in state['realm_bots']} normalize(state1) normalize(state2) self.assertEqual(state1, state2) def test_send_message_events(self): # type: () -> None schema_checker = check_dict([ ('type', equals('message')), ('flags', check_list(None)), ('message', check_dict([ ('avatar_url', check_string), ('client', check_string), ('content', check_string), ('content_type', equals('text/html')), ('display_recipient', check_string), ('gravatar_hash', check_string), ('id', check_int), ('recipient_id', check_int), ('sender_realm_str', check_string), ('sender_email', check_string), ('sender_full_name', check_string), ('sender_id', check_int), ('sender_short_name', check_string), ('subject', check_string), ('subject_links', check_list(None)), ('timestamp', check_int), ('type', check_string), ])), ]) events = self.do_test( lambda: self.send_message("hamlet@zulip.com", "Verona", Recipient.STREAM, "hello"), ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) # Verify message editing schema_checker = check_dict([ ('type', equals('update_message')), ('flags', check_list(None)), ('content', check_string), ('edit_timestamp', check_int), ('flags', check_list(None)), ('message_id', check_int), ('message_ids', check_list(check_int)), ('orig_content', check_string), ('orig_rendered_content', check_string), ('orig_subject', check_string), ('propagate_mode', check_string), ('rendered_content', check_string), ('sender', check_string), ('stream_id', check_int), ('subject', check_string), ('subject_links', check_list(None)), ('user_id', check_int), # There is also a timestamp field in the event, but we ignore it, as # it's kind of an unwanted but harmless side effect of calling log_event. ]) message = Message.objects.order_by('-id')[0] topic = 'new_topic' propagate_mode = 'change_all' content = 'new content' rendered_content = render_markdown(message, content) events = self.do_test( lambda: do_update_message(self.user_profile, message, topic, propagate_mode, content, rendered_content), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) # Verify do_update_embedded_data schema_checker = check_dict([ ('type', equals('update_message')), ('flags', check_list(None)), ('content', check_string), ('flags', check_list(None)), ('message_id', check_int), ('message_ids', check_list(check_int)), ('rendered_content', check_string), ('sender', check_string), ]) events = self.do_test( lambda: do_update_embedded_data(self.user_profile, message, u"embed_content", "
embed_content
"), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_update_message_flags(self): # type: () -> None # Test message flag update events schema_checker = check_dict([ ('id', check_int), ('type', equals('update_message_flags')), ('flag', check_string), ('messages', check_list(check_int)), ('operation', equals("add")), ]) message = self.send_message("cordelia@zulip.com", "hamlet@zulip.com", Recipient.PERSONAL, "hello") user_profile = get_user_profile_by_email("hamlet@zulip.com") events = self.do_test( lambda: do_update_message_flags(user_profile, 'add', 'starred', [message], False, None, None), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) schema_checker = check_dict([ ('id', check_int), ('type', equals('update_message_flags')), ('flag', check_string), ('messages', check_list(check_int)), ('operation', equals("remove")), ]) events = self.do_test( lambda: do_update_message_flags(user_profile, 'remove', 'starred', [message], False, None, None), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_send_reaction(self): # type: () -> None schema_checker = check_dict([ ('type', equals('reaction')), ('op', equals('add')), ('message_id', check_int), ('emoji_name', check_string), ('user', check_dict([ ('email', check_string), ('full_name', check_string), ('user_id', check_int) ])), ]) message_id = self.send_message("hamlet@zulip.com", "Verona", Recipient.STREAM, "hello") message = Message.objects.get(id=message_id) events = self.do_test( lambda: do_add_reaction( self.user_profile, message, "tada"), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_remove_reaction(self): # type: () -> None schema_checker = check_dict([ ('type', equals('reaction')), ('op', equals('remove')), ('message_id', check_int), ('emoji_name', check_string), ('user', check_dict([ ('email', check_string), ('full_name', check_string), ('user_id', check_int) ])), ]) message_id = self.send_message("hamlet@zulip.com", "Verona", Recipient.STREAM, "hello") message = Message.objects.get(id=message_id) events = self.do_test( lambda: do_remove_reaction( self.user_profile, message, "tada"), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_typing_events(self): # type: () -> None schema_checker = check_dict([ ('type', equals('typing')), ('op', equals('start')), ('sender', check_dict([ ('email', check_string), ('user_id', check_int)])), ('recipients', check_list(check_dict([ ('email', check_string), ('user_id', check_int), ]))), ]) events = self.do_test( lambda: check_send_typing_notification( self.user_profile, ["cordelia@zulip.com"], "start"), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_custom_profile_fields_events(self): # type: () -> None schema_checker = check_dict([ ('type', equals('custom_profile_fields')), ('fields', check_list(check_dict([ ('id', check_int), ('type', check_int), ('name', check_string), ]))), ]) events = self.do_test( lambda: notify_realm_custom_profile_fields( self.user_profile.realm), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_presence_events(self): # type: () -> None schema_checker = check_dict([ ('type', equals('pointer')), ('email', check_string), ('timestamp', check_float), ('presence', check_dict([ # TODO: Add more here once the test below works ])), ]) # BUG: Marked as failing for now because this is a failing # test, due to the `aggregated` feature not being supported by # our events code. with self.assertRaises(AssertionError): events = self.do_test(lambda: do_update_user_presence( self.user_profile, get_client("website"), timezone_now(), UserPresence.ACTIVE)) # Marked as nocoverage since unreachable error = schema_checker('events[0]', events[0]) # nocoverage self.assert_on_error(error) # nocoverage def test_pointer_events(self): # type: () -> None schema_checker = check_dict([ ('type', equals('pointer')), ('pointer', check_int) ]) events = self.do_test(lambda: do_update_pointer(self.user_profile, 1500)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_referral_events(self): # type: () -> None schema_checker = check_dict([ ('type', equals('referral')), ('referrals', check_dict([ ('granted', check_int), ('used', check_int), ])), ]) events = self.do_test(lambda: do_refer_friend(self.user_profile, "friend@example.com")) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_register_events(self): # type: () -> None realm_user_add_checker = check_dict([ ('type', equals('realm_user')), ('op', equals('add')), ('person', check_dict([ ('email', check_string), ('full_name', check_string), ('is_admin', check_bool), ('is_bot', check_bool), ])), ]) events = self.do_test(lambda: self.register("test1@zulip.com", "test1")) self.assert_length(events, 1) error = realm_user_add_checker('events[0]', events[0]) self.assert_on_error(error) def test_alert_words_events(self): # type: () -> None alert_words_checker = check_dict([ ('type', equals('alert_words')), ('alert_words', check_list(check_string)), ]) events = self.do_test(lambda: do_add_alert_words(self.user_profile, ["alert_word"])) error = alert_words_checker('events[0]', events[0]) self.assert_on_error(error) events = self.do_test(lambda: do_remove_alert_words(self.user_profile, ["alert_word"])) error = alert_words_checker('events[0]', events[0]) self.assert_on_error(error) def test_default_streams_events(self): # type: () -> None default_streams_checker = check_dict([ ('type', equals('default_streams')), ('default_streams', check_list(check_dict([ ('description', check_string), ('invite_only', check_bool), ('name', check_string), ('stream_id', check_int), ]))), ]) stream = get_stream("Scotland", self.user_profile.realm) events = self.do_test(lambda: do_add_default_stream(stream)) error = default_streams_checker('events[0]', events[0]) events = self.do_test(lambda: do_remove_default_stream(stream)) error = default_streams_checker('events[0]', events[0]) self.assert_on_error(error) def test_muted_topics_events(self): # type: () -> None muted_topics_checker = check_dict([ ('type', equals('muted_topics')), ('muted_topics', check_list(check_list(check_string, 2))), ]) events = self.do_test(lambda: do_set_muted_topics(self.user_profile, [[u"Denmark", u"topic"]])) error = muted_topics_checker('events[0]', events[0]) self.assert_on_error(error) events = self.do_test(lambda: do_update_muted_topic( self.user_profile, "Denmark", "topic", "add")) error = muted_topics_checker('events[0]', events[0]) self.assert_on_error(error) events = self.do_test(lambda: do_update_muted_topic( self.user_profile, "Denmark", "topic", "remove")) error = muted_topics_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_avatar_fields(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm_user')), ('op', equals('update')), ('person', check_dict([ ('email', check_string), ('user_id', check_int), ('avatar_url', check_string), ])), ]) events = self.do_test( lambda: do_change_avatar_fields(self.user_profile, UserProfile.AVATAR_FROM_USER), ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_full_name(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm_user')), ('op', equals('update')), ('person', check_dict([ ('email', check_string), ('full_name', check_string), ])), ]) events = self.do_test(lambda: do_change_full_name(self.user_profile, 'Sir Hamlet')) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def do_set_realm_property_test(self, name): # type: (str) -> None bool_tests = [True, False, True] # type: List[bool] test_values = dict( add_emoji_by_admins_only=bool_tests, create_stream_by_admins_only=bool_tests, default_language=[u'es', u'de', u'en'], description=[u'Realm description', u'New description'], email_changes_disabled=bool_tests, invite_required=bool_tests, invite_by_admins_only=bool_tests, inline_image_preview=bool_tests, inline_url_embed_preview=bool_tests, message_retention_days=[10, 20], name=[u'Zulip', u'New Name'], name_changes_disabled=bool_tests, restricted_to_domain=bool_tests, waiting_period_threshold=[10, 20], ) # type: Dict[str, Any] property_type = Realm.property_types[name] if property_type is bool: validator = check_bool elif property_type is Text: validator = check_string elif property_type is int: validator = check_int elif property_type == (int, type(None)): validator = check_int else: raise AssertionError("Unexpected property type %s" % (property_type,)) schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update')), ('property', equals(name)), ('value', validator), ]) vals = test_values.get(name) if vals is None: raise AssertionError('No test created for %s' % (name)) do_set_realm_property(self.user_profile.realm, name, vals[0]) for val in vals[1:]: events = self.do_test( lambda: do_set_realm_property(self.user_profile.realm, name, val)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_property(self): # type: () -> None for prop in Realm.property_types: self.do_set_realm_property_test(prop) def test_change_realm_authentication_methods(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update_dict')), ('property', equals('default')), ('data', check_dict([])), ]) def fake_backends(): # type: () -> Any backends = ( 'zproject.backends.DevAuthBackend', 'zproject.backends.EmailAuthBackend', 'zproject.backends.GitHubAuthBackend', 'zproject.backends.GoogleMobileOauth2Backend', 'zproject.backends.ZulipLDAPAuthBackend', ) return self.settings(AUTHENTICATION_BACKENDS=backends) # Test transitions; any new backends should be tested with T/T/T/F/T for (auth_method_dict) in \ ({'Google': True, 'Email': True, 'GitHub': True, 'LDAP': False, 'Dev': False}, {'Google': True, 'Email': True, 'GitHub': False, 'LDAP': False, 'Dev': False}, {'Google': True, 'Email': False, 'GitHub': False, 'LDAP': False, 'Dev': False}, {'Google': True, 'Email': False, 'GitHub': True, 'LDAP': False, 'Dev': False}, {'Google': False, 'Email': False, 'GitHub': False, 'LDAP': False, 'Dev': True}, {'Google': False, 'Email': False, 'GitHub': True, 'LDAP': False, 'Dev': True}, {'Google': False, 'Email': True, 'GitHub': True, 'LDAP': True, 'Dev': False}): with fake_backends(): events = self.do_test( lambda: do_set_realm_authentication_methods( self.user_profile.realm, auth_method_dict)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_pin_stream(self): # type: () -> None schema_checker = check_dict([ ('type', equals('subscription')), ('op', equals('update')), ('property', equals('pin_to_top')), ('stream_id', check_int), ('value', check_bool), ]) stream = get_stream("Denmark", self.user_profile.realm) sub = get_subscription(stream.name, self.user_profile) do_change_subscription_property(self.user_profile, sub, stream, "pin_to_top", False) for pinned in (True, False): events = self.do_test(lambda: do_change_subscription_property(self.user_profile, sub, stream, "pin_to_top", pinned)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_message_edit_settings(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update_dict')), ('property', equals('default')), ('data', check_dict([('allow_message_editing', check_bool), ('message_content_edit_limit_seconds', check_int)])), ]) # Test every transition among the four possibilities {T,F} x {0, non-0} for (allow_message_editing, message_content_edit_limit_seconds) in \ ((True, 0), (False, 0), (True, 0), (False, 1234), (True, 0), (True, 1234), (True, 0), (False, 0), (False, 1234), (False, 0), (True, 1234), (False, 0), (True, 1234), (True, 600), (False, 600), (False, 1234), (True, 600)): events = self.do_test( lambda: do_set_realm_message_editing(self.user_profile.realm, allow_message_editing, message_content_edit_limit_seconds)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_is_admin(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm_user')), ('op', equals('update')), ('person', check_dict([ ('email', check_string), ('is_admin', check_bool), ])), ]) do_change_is_admin(self.user_profile, False) for is_admin in [True, False]: events = self.do_test(lambda: do_change_is_admin(self.user_profile, is_admin)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def do_set_user_display_settings_test(self, setting_name, values_list): # type: (str, List[Union[bool, Text]]) -> None property_type = UserProfile.property_types[setting_name] if property_type is bool: validator = check_bool elif property_type is Text: validator = check_string else: raise AssertionError("Unexpected property type %s" % (property_type,)) num_events = 1 if setting_name == "timezone": num_events = 2 if property_type == bool: do_set_user_display_setting(self.user_profile, setting_name, False) for value in values_list: events = self.do_test(lambda: do_set_user_display_setting( self.user_profile, setting_name, value), num_events=num_events) schema_checker = check_dict([ ('type', equals('update_display_settings')), ('setting_name', equals(setting_name)), ('user', check_string), ('setting', validator), ]) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) timezone_schema_checker = check_dict([ ('type', equals('realm_user')), ('op', equals('update')), ('person', check_dict([ ('email', check_string), ('user_id', check_int), ('timezone', check_string), ])), ]) if setting_name == "timezone": error = timezone_schema_checker('events[1]', events[1]) def test_change_twenty_four_hour_time(self): # type: () -> None self.do_set_user_display_settings_test("twenty_four_hour_time", [True, False]) def test_change_left_side_userlist(self): # type: () -> None self.do_set_user_display_settings_test("left_side_userlist", [True, False]) def test_change_emoji_alt_code(self): # type: () -> None self.do_set_user_display_settings_test("emoji_alt_code", [True, False]) def test_change_default_language(self): # type: () -> None self.do_set_user_display_settings_test("default_language", [u'de', u'es', u'en']) def test_change_timezone(self): # type: () -> None self.do_set_user_display_settings_test("timezone", [u'US/Mountain', u'US/Samoa', u'Pacific/Galapagos', u'']) def test_change_enable_stream_desktop_notifications(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals('enable_stream_desktop_notifications')), ('user', check_string), ('setting', check_bool), ]) do_change_enable_stream_desktop_notifications(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_enable_stream_desktop_notifications(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_enable_stream_sounds(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals('enable_stream_sounds')), ('user', check_string), ('setting', check_bool), ]) do_change_enable_stream_sounds(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_enable_stream_sounds(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_enable_desktop_notifications(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals('enable_desktop_notifications')), ('user', check_string), ('setting', check_bool), ]) do_change_enable_desktop_notifications(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_enable_desktop_notifications(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_enable_sounds(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals('enable_sounds')), ('user', check_string), ('setting', check_bool), ]) do_change_enable_sounds(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_enable_sounds(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_enable_offline_email_notifications(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals('enable_offline_email_notifications')), ('user', check_string), ('setting', check_bool), ]) do_change_enable_offline_email_notifications(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_enable_offline_email_notifications(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_enable_offline_push_notifications(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals('enable_offline_push_notifications')), ('user', check_string), ('setting', check_bool), ]) do_change_enable_offline_push_notifications(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_enable_offline_push_notifications(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_enable_online_push_notifications(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals('enable_online_push_notifications')), ('user', check_string), ('setting', check_bool), ]) do_change_enable_online_push_notifications(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_enable_online_push_notifications(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_pm_content_in_desktop_notifications(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals('pm_content_in_desktop_notifications')), ('user', check_string), ('setting', check_bool), ]) do_change_pm_content_in_desktop_notifications(self.user_profile, False) for setting_value in [True, False]: events = self.do_test( lambda: do_change_pm_content_in_desktop_notifications(self.user_profile, setting_value), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_enable_digest_emails(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals('enable_digest_emails')), ('user', check_string), ('setting', check_bool), ]) do_change_enable_digest_emails(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_enable_digest_emails(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_realm_emoji_events(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm_emoji')), ('op', equals('update')), ('realm_emoji', check_dict([])), ]) events = self.do_test(lambda: check_add_realm_emoji(get_realm("zulip"), "my_emoji", "https://realm.com/my_emoji")) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) events = self.do_test(lambda: do_remove_realm_emoji(get_realm("zulip"), "my_emoji")) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_realm_filter_events(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm_filters')), ('realm_filters', check_list(None)), # TODO: validate tuples in the list ]) events = self.do_test(lambda: do_add_realm_filter(get_realm("zulip"), "#(?P