zulip/zerver/tests/test_home.py

603 lines
22 KiB
Python

import datetime
import os
import ujson
from django.http import HttpResponse
from django.test import override_settings
from mock import MagicMock, patch
from six.moves import urllib
from typing import Any, Dict, List
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import (
HostRequestMock, queries_captured, get_user_messages
)
from zerver.lib.soft_deactivation import do_soft_deactivate_users
from zerver.lib.test_runner import slow
from zerver.models import (
get_realm, get_stream, get_user, UserProfile, UserMessage, Recipient,
flush_per_request_caches, DefaultStream
)
from zerver.views.home import home, sent_time_in_epoch_seconds
class HomeTest(ZulipTestCase):
def test_home(self):
# type: () -> None
# Keep this list sorted!!!
html_bits = [
'Compose your message here...',
'Exclude messages with topic',
'Keyboard shortcuts',
'Loading...',
'Manage streams',
'Narrow by topic',
'Next message',
'Search streams',
'Welcome to Zulip',
'pygments.css',
'var page_params',
]
# Keep this list sorted!!!
expected_keys = [
"alert_words",
"attachments",
"autoscroll_forever",
"avatar_source",
"avatar_url",
"avatar_url_medium",
"can_create_streams",
"cross_realm_bots",
"custom_profile_fields",
"debug_mode",
"default_desktop_notifications",
"default_language",
"default_language_name",
"development_environment",
"email",
"emoji_alt_code",
"emojiset",
"emojiset_choices",
"enable_desktop_notifications",
"enable_digest_emails",
"enable_offline_email_notifications",
"enable_offline_push_notifications",
"enable_online_push_notifications",
"enable_sounds",
"enable_stream_desktop_notifications",
"enable_stream_push_notifications",
"enable_stream_sounds",
"enter_sends",
"first_in_realm",
"full_name",
"furthest_read_time",
"has_mobile_devices",
"have_initial_messages",
"high_contrast_mode",
"hotspots",
"initial_servertime",
"is_admin",
"language_list",
"language_list_dbl_col",
"last_event_id",
"left_side_userlist",
"login_page",
"max_avatar_file_size",
"max_icon_file_size",
"max_message_id",
"maxfilesize",
"muted_topics",
"narrow",
"narrow_stream",
"needs_tutorial",
"never_subscribed",
"password_min_length",
"password_min_quality",
"pm_content_in_desktop_notifications",
"pointer",
"poll_timeout",
"presences",
"prompt_for_invites",
"queue_id",
"realm_add_emoji_by_admins_only",
"realm_allow_edit_history",
"realm_allow_message_editing",
"realm_authentication_methods",
"realm_bot_domain",
"realm_bots",
"realm_create_stream_by_admins_only",
"realm_default_language",
"realm_default_streams",
"realm_description",
"realm_domains",
"realm_email_changes_disabled",
"realm_emoji",
"realm_filters",
"realm_icon_source",
"realm_icon_url",
"realm_inline_image_preview",
"realm_inline_url_embed_preview",
"realm_invite_by_admins_only",
"realm_invite_required",
"realm_is_zephyr_mirror_realm",
"realm_mandatory_topics",
"realm_message_content_edit_limit_seconds",
"realm_message_retention_days",
"realm_name",
"realm_name_changes_disabled",
"realm_notifications_stream_id",
"realm_password_auth_enabled",
"realm_presence_disabled",
"realm_restricted_to_domain",
"realm_show_digest_email",
"realm_uri",
"realm_users",
"realm_waiting_period_threshold",
"root_domain_uri",
"save_stacktraces",
"server_generation",
"server_inline_image_preview",
"server_inline_url_embed_preview",
"subscriptions",
"test_suite",
"timezone",
"total_uploads_size",
"twenty_four_hour_time",
"unread_msgs",
"unsubscribed",
"upload_quota",
"use_websockets",
"user_id",
"zulip_version",
]
email = self.example_email("hamlet")
# Verify fails if logged-out
result = self.client_get('/')
self.assertEqual(result.status_code, 302)
self.login(email)
# Create bot for realm_bots testing. Must be done before fetching home_page.
bot_info = {
'full_name': 'The Bot of Hamlet',
'short_name': 'hambot',
}
self.client_post("/json/bots", bot_info)
# Verify succeeds once logged-in
flush_per_request_caches()
with queries_captured() as queries:
result = self._get_home_page(stream='Denmark')
self.assert_length(queries, 41)
html = result.content.decode('utf-8')
for html_bit in html_bits:
if html_bit not in html:
raise AssertionError('%s not in result' % (html_bit,))
page_params = self._get_page_params(result)
actual_keys = sorted([str(k) for k in page_params.keys()])
self.assertEqual(actual_keys, expected_keys)
# TODO: Inspect the page_params data further.
# print(ujson.dumps(page_params, indent=2))
realm_bots_expected_keys = [
'api_key',
'avatar_url',
'bot_type',
'default_all_public_streams',
'default_events_register_stream',
'default_sending_stream',
'email',
'full_name',
'is_active',
'owner',
'user_id',
]
realm_bots_actual_keys = sorted([str(key) for key in page_params['realm_bots'][0].keys()])
self.assertEqual(realm_bots_actual_keys, realm_bots_expected_keys)
def test_num_queries_with_streams(self):
# type: () -> None
main_user = self.example_user('hamlet')
other_user = self.example_user('cordelia')
realm_id = main_user.realm_id
self.login(main_user.email)
# Try to make page-load do extra work for various subscribed
# streams.
for i in range(10):
stream_name = 'test_stream_' + str(i)
stream = self.make_stream(stream_name)
DefaultStream.objects.create(
realm_id=realm_id,
stream_id=stream.id
)
for user in [main_user, other_user]:
self.subscribe(user, stream_name)
# Simulate hitting the page the first time to avoid some noise
# related to initial logins.
self._get_home_page()
# Then for the second page load, measure the number of queries.
flush_per_request_caches()
with queries_captured() as queries2:
result = self._get_home_page()
self.assert_length(queries2, 34)
# Do a sanity check that our new streams were in the payload.
html = result.content.decode('utf-8')
self.assertIn('test_stream_7', html)
def _get_home_page(self, **kwargs):
# type: (**Any) -> HttpResponse
with \
patch('zerver.lib.events.request_event_queue', return_value=42), \
patch('zerver.lib.events.get_user_events', return_value=[]):
result = self.client_get('/', dict(**kwargs))
return result
def _get_page_params(self, result):
# type: (HttpResponse) -> Dict[str, Any]
html = result.content.decode('utf-8')
lines = html.split('\n')
page_params_line = [l for l in lines if l.startswith('var page_params')][0]
page_params_json = page_params_line.split(' = ')[1].rstrip(';')
page_params = ujson.loads(page_params_json)
return page_params
def _sanity_check(self, result):
# type: (HttpResponse) -> None
'''
Use this for tests that are geared toward specific edge cases, but
which still want the home page to load properly.
'''
html = result.content.decode('utf-8')
if 'Compose your message' not in html:
raise AssertionError('Home page probably did not load.')
def test_terms_of_service(self):
# type: () -> None
user = self.example_user('hamlet')
email = user.email
self.login(email)
for user_tos_version in [None, '1.1', '2.0.3.4']:
user.tos_version = user_tos_version
user.save()
with \
self.settings(TERMS_OF_SERVICE='whatever'), \
self.settings(TOS_VERSION='99.99'):
result = self.client_get('/', dict(stream='Denmark'))
html = result.content.decode('utf-8')
self.assertIn('There are new Terms of Service', html)
def test_terms_of_service_first_time_template(self):
# type: () -> None
user = self.example_user('hamlet')
email = user.email
self.login(email)
user.tos_version = None
user.save()
with \
self.settings(FIRST_TIME_TOS_TEMPLATE='hello.html'), \
self.settings(TOS_VERSION='99.99'):
result = self.client_post('/accounts/accept_terms/')
self.assertEqual(result.status_code, 200)
self.assert_in_response("I agree to the", result)
self.assert_in_response("most productive group chat", result)
def test_accept_terms_of_service(self):
# type: () -> None
email = self.example_email("hamlet")
self.login(email)
result = self.client_post('/accounts/accept_terms/')
self.assertEqual(result.status_code, 200)
self.assert_in_response("I agree to the", result)
result = self.client_post('/accounts/accept_terms/', {'terms': True})
self.assertEqual(result.status_code, 302)
self.assertEqual(result['Location'], '/')
def test_bad_narrow(self):
# type: () -> None
email = self.example_email("hamlet")
self.login(email)
with patch('logging.exception') as mock:
result = self._get_home_page(stream='Invalid Stream')
mock.assert_called_once_with('Narrow parsing')
self._sanity_check(result)
def test_bad_pointer(self):
# type: () -> None
user_profile = self.example_user('hamlet')
email = user_profile.email
user_profile.pointer = 999999
user_profile.save()
self.login(email)
with patch('logging.warning') as mock:
result = self._get_home_page()
mock.assert_called_once_with('hamlet@zulip.com has invalid pointer 999999')
self._sanity_check(result)
def test_topic_narrow(self):
# type: () -> None
email = self.example_email("hamlet")
self.login(email)
result = self._get_home_page(stream='Denmark', topic='lunch')
self._sanity_check(result)
html = result.content.decode('utf-8')
self.assertIn('lunch', html)
def test_notifications_stream(self):
# type: () -> None
email = self.example_email("hamlet")
realm = get_realm('zulip')
realm.notifications_stream = get_stream('Denmark', realm)
realm.save()
self.login(email)
result = self._get_home_page()
page_params = self._get_page_params(result)
self.assertEqual(page_params['realm_notifications_stream_id'], get_stream('Denmark', realm).id)
def test_people(self):
# type: () -> None
email = self.example_email('hamlet')
realm = get_realm('zulip')
self.login(email)
result = self._get_home_page()
page_params = self._get_page_params(result)
for params in ['realm_users', 'realm_bots']:
users = page_params['realm_users']
self.assertTrue(len(users) >= 3)
for user in users:
self.assertEqual(user['user_id'],
get_user(user['email'], realm).id)
cross_bots = page_params['cross_realm_bots']
self.assertEqual(len(cross_bots), 3)
cross_bots.sort(key=lambda d: d['email'])
notification_bot = self.notification_bot()
self.assertEqual(cross_bots, [
dict(
user_id=get_user('feedback@zulip.com', get_realm('zulip')).id,
is_admin=False,
email='feedback@zulip.com',
full_name='Zulip Feedback Bot',
is_bot=True
),
dict(
user_id=notification_bot.id,
is_admin=False,
email=notification_bot.email,
full_name='Notification Bot',
is_bot=True
),
dict(
user_id=get_user('welcome-bot@zulip.com', get_realm('zulip')).id,
is_admin=False,
email='welcome-bot@zulip.com',
full_name='Welcome Bot',
is_bot=True
),
])
def test_new_stream(self):
# type: () -> None
user_profile = self.example_user("hamlet")
stream_name = 'New stream'
self.subscribe(user_profile, stream_name)
self.login(user_profile.email)
result = self._get_home_page(stream=stream_name)
page_params = self._get_page_params(result)
self.assertEqual(page_params['narrow_stream'], stream_name)
self.assertEqual(page_params['narrow'], [dict(operator='stream', operand=stream_name)])
self.assertEqual(page_params['pointer'], -1)
self.assertEqual(page_params['max_message_id'], -1)
self.assertEqual(page_params['have_initial_messages'], False)
def test_invites_by_admins_only(self):
# type: () -> None
user_profile = self.example_user('hamlet')
email = user_profile.email
realm = user_profile.realm
realm.invite_by_admins_only = True
realm.save()
self.login(email)
self.assertFalse(user_profile.is_realm_admin)
result = self._get_home_page()
html = result.content.decode('utf-8')
self.assertNotIn('Invite more users', html)
user_profile.is_realm_admin = True
user_profile.save()
result = self._get_home_page()
html = result.content.decode('utf-8')
self.assertIn('Invite more users', html)
def test_desktop_home(self):
# type: () -> None
email = self.example_email("hamlet")
self.login(email)
result = self.client_get("/desktop_home")
self.assertEqual(result.status_code, 301)
self.assertTrue(result["Location"].endswith("/desktop_home/"))
result = self.client_get("/desktop_home/")
self.assertEqual(result.status_code, 302)
path = urllib.parse.urlparse(result['Location']).path
self.assertEqual(path, "/")
def test_apps_view(self):
# type: () -> None
result = self.client_get('/apps')
self.assertEqual(result.status_code, 301)
self.assertTrue(result['Location'].endswith('/apps/'))
with self.settings(ZILENCER_ENABLED=False):
result = self.client_get('/apps/')
self.assertEqual(result.status_code, 301)
self.assertTrue(result['Location'] == 'https://zulipchat.com/apps/')
with self.settings(ZILENCER_ENABLED=True):
result = self.client_get('/apps/')
self.assertEqual(result.status_code, 200)
html = result.content.decode('utf-8')
self.assertIn('Apps for every platform.', html)
def test_generate_204(self):
# type: () -> None
email = self.example_email("hamlet")
self.login(email)
result = self.client_get("/api/v1/generate_204")
self.assertEqual(result.status_code, 204)
def test_message_sent_time(self):
# type: () -> None
epoch_seconds = 1490472096
pub_date = datetime.datetime.fromtimestamp(epoch_seconds)
user_message = MagicMock()
user_message.message.pub_date = pub_date
self.assertEqual(sent_time_in_epoch_seconds(user_message), epoch_seconds)
def test_handlebars_compile_error(self):
# type: () -> None
request = HostRequestMock()
with self.settings(DEVELOPMENT=True):
with patch('os.path.exists', return_value=True):
result = home(request)
self.assertEqual(result.status_code, 500)
self.assert_in_response('Error compiling handlebars templates.', result)
def test_subdomain_homepage(self):
# type: () -> None
email = self.example_email("hamlet")
self.login(email)
with self.settings(ROOT_DOMAIN_LANDING_PAGE=True):
with patch('zerver.views.home.get_subdomain', return_value=""):
result = self._get_home_page()
self.assertEqual(result.status_code, 200)
self.assert_in_response('most productive group chat', result)
with patch('zerver.views.home.get_subdomain', return_value="subdomain"):
result = self._get_home_page()
self._sanity_check(result)
def send_stream_message(self, content, sender_name='iago',
stream_name='Denmark', subject='foo'):
# type: (str, str, str, str) -> None
sender = self.example_email(sender_name)
self.send_message(sender, stream_name, Recipient.STREAM,
content, subject)
def soft_activate_and_get_unread_count(self, stream='Denmark', topic='foo'):
# type: (str, str) -> int
stream_narrow = self._get_home_page(stream=stream, topic=topic)
page_params = self._get_page_params(stream_narrow)
return page_params['unread_msgs']['count']
def test_unread_count_user_soft_deactivation(self):
# type: () -> None
# In this test we make sure if a soft deactivated user had unread
# messages before deactivation they remain same way after activation.
long_term_idle_user = self.example_user('hamlet')
self.login(long_term_idle_user.email)
message = 'Test Message 1'
self.send_stream_message(message)
with queries_captured() as queries:
self.assertEqual(self.soft_activate_and_get_unread_count(), 1)
query_count = len(queries)
user_msg_list = get_user_messages(long_term_idle_user)
self.assertEqual(user_msg_list[-1].content, message)
self.logout()
do_soft_deactivate_users([long_term_idle_user])
self.login(long_term_idle_user.email)
message = 'Test Message 2'
self.send_stream_message(message)
idle_user_msg_list = get_user_messages(long_term_idle_user)
self.assertNotEqual(idle_user_msg_list[-1].content, message)
with queries_captured() as queries:
self.assertEqual(self.soft_activate_and_get_unread_count(), 2)
# Test here for query count to be at least 5 greater than previous count
# This will assure indirectly that add_missing_messages() was called.
self.assertGreaterEqual(len(queries) - query_count, 5)
idle_user_msg_list = get_user_messages(long_term_idle_user)
self.assertEqual(idle_user_msg_list[-1].content, message)
def test_multiple_user_soft_deactivations(self):
# type: () -> None
long_term_idle_user = self.example_user('hamlet')
# We are sending this message to ensure that long_term_idle_user has
# at least one UserMessage row.
self.send_stream_message('Testing', sender_name='hamlet')
do_soft_deactivate_users([long_term_idle_user])
message = 'Test Message 1'
self.send_stream_message(message)
self.login(long_term_idle_user.email)
with queries_captured() as queries:
self.assertEqual(self.soft_activate_and_get_unread_count(), 2)
query_count = len(queries)
long_term_idle_user.refresh_from_db()
self.assertFalse(long_term_idle_user.long_term_idle)
idle_user_msg_list = get_user_messages(long_term_idle_user)
self.assertEqual(idle_user_msg_list[-1].content, message)
message = 'Test Message 2'
self.send_stream_message(message)
with queries_captured() as queries:
self.assertEqual(self.soft_activate_and_get_unread_count(), 3)
# Test here for query count to be at least 5 less than previous count.
# This will assure add_missing_messages() isn't repeatedly called.
self.assertGreaterEqual(query_count - len(queries), 5)
idle_user_msg_list = get_user_messages(long_term_idle_user)
self.assertEqual(idle_user_msg_list[-1].content, message)
self.logout()
do_soft_deactivate_users([long_term_idle_user])
message = 'Test Message 3'
self.send_stream_message(message)
self.login(long_term_idle_user.email)
with queries_captured() as queries:
self.assertEqual(self.soft_activate_and_get_unread_count(), 4)
query_count = len(queries)
long_term_idle_user.refresh_from_db()
self.assertFalse(long_term_idle_user.long_term_idle)
idle_user_msg_list = get_user_messages(long_term_idle_user)
self.assertEqual(idle_user_msg_list[-1].content, message)
message = 'Test Message 4'
self.send_stream_message(message)
with queries_captured() as queries:
self.assertEqual(self.soft_activate_and_get_unread_count(), 5)
self.assertGreaterEqual(query_count - len(queries), 5)
idle_user_msg_list = get_user_messages(long_term_idle_user)
self.assertEqual(idle_user_msg_list[-1].content, message)
self.logout()