import datetime from datetime import timedelta from typing import Any, Dict from unittest import mock from django.utils.timezone import now as timezone_now from zerver.lib.actions import do_deactivate_user from zerver.lib.presence import get_status_dict_by_realm from zerver.lib.statistics import seconds_usage_between from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_helpers import make_client, queries_captured, reset_emails_in_zulip_realm from zerver.lib.timestamp import datetime_to_timestamp from zerver.models import ( Client, PushDeviceToken, UserActivity, UserActivityInterval, UserPresence, UserProfile, flush_per_request_caches, ) class ActivityTest(ZulipTestCase): @mock.patch("stripe.Customer.list", return_value=[]) def test_activity(self, unused_mock: mock.Mock) -> None: self.login('hamlet') client, _ = Client.objects.get_or_create(name='website') query = '/json/messages/flags' last_visit = timezone_now() count = 150 for activity_user_profile in UserProfile.objects.all(): UserActivity.objects.get_or_create( user_profile=activity_user_profile, client=client, query=query, count=count, last_visit=last_visit, ) # Fails when not staff result = self.client_get('/activity') self.assertEqual(result.status_code, 302) user_profile = self.example_user("hamlet") user_profile.is_staff = True user_profile.save() flush_per_request_caches() with queries_captured() as queries: result = self.client_get('/activity') self.assertEqual(result.status_code, 200) self.assert_length(queries, 14) flush_per_request_caches() with queries_captured() as queries: result = self.client_get('/realm_activity/zulip/') self.assertEqual(result.status_code, 200) self.assert_length(queries, 8) flush_per_request_caches() with queries_captured() as queries: result = self.client_get('/user_activity/iago@zulip.com/') self.assertEqual(result.status_code, 200) self.assert_length(queries, 4) class TestClientModel(ZulipTestCase): def test_client_stringification(self) -> None: ''' This test is designed to cover __str__ method for Client. ''' client = make_client('some_client') self.assertEqual(str(client), '') class UserPresenceModelTests(ZulipTestCase): def test_date_logic(self) -> None: UserPresence.objects.all().delete() user_profile = self.example_user('hamlet') email = user_profile.email presence_dct = get_status_dict_by_realm(user_profile.realm_id) self.assertEqual(len(presence_dct), 0) self.login_user(user_profile) result = self.client_post("/json/users/me/presence", {'status': 'active'}) self.assert_json_success(result) slim_presence = False presence_dct = get_status_dict_by_realm(user_profile.realm_id, slim_presence) self.assertEqual(len(presence_dct), 1) self.assertEqual(presence_dct[email]['website']['status'], 'active') slim_presence = True presence_dct = get_status_dict_by_realm(user_profile.realm_id, slim_presence) self.assertEqual(len(presence_dct), 1) info = presence_dct[str(user_profile.id)] self.assertEqual(set(info.keys()), {'active_timestamp'}) def back_date(num_weeks: int) -> None: user_presence = UserPresence.objects.filter(user_profile=user_profile)[0] user_presence.timestamp = timezone_now() - datetime.timedelta(weeks=num_weeks) user_presence.save() # Simulate the presence being a week old first. Nothing should change. back_date(num_weeks=1) presence_dct = get_status_dict_by_realm(user_profile.realm_id) self.assertEqual(len(presence_dct), 1) # If the UserPresence row is three weeks old, we ignore it. back_date(num_weeks=3) presence_dct = get_status_dict_by_realm(user_profile.realm_id) self.assertEqual(len(presence_dct), 0) def test_push_tokens(self) -> None: UserPresence.objects.all().delete() user_profile = self.example_user('hamlet') email = user_profile.email self.login_user(user_profile) result = self.client_post("/json/users/me/presence", {'status': 'active'}) self.assert_json_success(result) def pushable() -> bool: presence_dct = get_status_dict_by_realm(user_profile.realm_id) self.assertEqual(len(presence_dct), 1) return presence_dct[email]['website']['pushable'] self.assertFalse(pushable()) user_profile.enable_offline_push_notifications = True user_profile.save() self.assertFalse(pushable()) PushDeviceToken.objects.create( user=user_profile, kind=PushDeviceToken.APNS, ) self.assertTrue(pushable()) class UserPresenceTests(ZulipTestCase): def test_invalid_presence(self) -> None: user = self.example_user("hamlet") self.login_user(user) result = self.client_post("/json/users/me/presence", {'status': 'foo'}) self.assert_json_error(result, 'Invalid status: foo') def test_set_idle(self) -> None: client = 'website' hamlet = self.example_user('hamlet') othello = self.example_user('othello') self.login_user(hamlet) params = dict(status='idle') result = self.client_post("/json/users/me/presence", params) self.assert_json_success(result) json = result.json() self.assertEqual(json['presences'][hamlet.email][client]['status'], 'idle') self.assertIn('timestamp', json['presences'][hamlet.email][client]) self.assertIsInstance(json['presences'][hamlet.email][client]['timestamp'], int) self.assertEqual(set(json['presences'].keys()), {hamlet.email}) self.login_user(othello) params = dict(status='idle', slim_presence='true') result = self.client_post("/json/users/me/presence", params) json = result.json() presences = json['presences'] self.assertEqual( set(presences.keys()), {str(hamlet.id), str(othello.id)}, ) hamlet_info = presences[str(hamlet.id)] othello_info = presences[str(othello.id)] self.assertEqual(set(hamlet_info.keys()), {'idle_timestamp'}) self.assertEqual(set(othello_info.keys()), {'idle_timestamp'}) self.assertGreaterEqual( othello_info['idle_timestamp'], hamlet_info['idle_timestamp'], ) def test_set_active(self) -> None: hamlet = self.example_user('hamlet') othello = self.example_user('othello') self.login_user(hamlet) client = 'website' params = dict(status='idle') result = self.client_post("/json/users/me/presence", params) self.assert_json_success(result) self.assertEqual(result.json()['presences'][hamlet.email][client]['status'], 'idle') self.login('othello') params = dict(status='idle') result = self.client_post("/json/users/me/presence", params) self.assert_json_success(result) json = result.json() self.assertEqual(json['presences'][othello.email][client]['status'], 'idle') self.assertEqual(json['presences'][hamlet.email][client]['status'], 'idle') params = dict(status='active') result = self.client_post("/json/users/me/presence", params) self.assert_json_success(result) json = result.json() self.assertEqual(json['presences'][othello.email][client]['status'], 'active') self.assertEqual(json['presences'][hamlet.email][client]['status'], 'idle') self.login_user(hamlet) params = dict(status='active', slim_presence='true') result = self.client_post("/json/users/me/presence", params) self.assert_json_success(result) json = result.json() presences = json['presences'] self.assertEqual( set(presences.keys()), {str(hamlet.id), str(othello.id)}, ) othello_info = presences[str(othello.id)] hamlet_info = presences[str(hamlet.id)] self.assertEqual( set(othello_info.keys()), {'active_timestamp'}, ) self.assertEqual( set(hamlet_info.keys()), {'active_timestamp'}, ) self.assertGreaterEqual( hamlet_info['active_timestamp'], othello_info['active_timestamp'], ) @mock.patch("stripe.Customer.list", return_value=[]) def test_new_user_input(self, unused_mock: mock.Mock) -> None: """Mostly a test for UserActivityInterval""" user_profile = self.example_user("hamlet") self.login('hamlet') self.assertEqual(UserActivityInterval.objects.filter(user_profile=user_profile).count(), 0) time_zero = timezone_now().replace(microsecond=0) with mock.patch('zerver.views.presence.timezone_now', return_value=time_zero): result = self.client_post("/json/users/me/presence", {'status': 'active', 'new_user_input': 'true'}) self.assert_json_success(result) self.assertEqual(UserActivityInterval.objects.filter(user_profile=user_profile).count(), 1) interval = UserActivityInterval.objects.get(user_profile=user_profile) self.assertEqual(interval.start, time_zero) self.assertEqual(interval.end, time_zero + UserActivityInterval.MIN_INTERVAL_LENGTH) second_time = time_zero + timedelta(seconds=600) # Extent the interval with mock.patch('zerver.views.presence.timezone_now', return_value=second_time): result = self.client_post("/json/users/me/presence", {'status': 'active', 'new_user_input': 'true'}) self.assert_json_success(result) self.assertEqual(UserActivityInterval.objects.filter(user_profile=user_profile).count(), 1) interval = UserActivityInterval.objects.get(user_profile=user_profile) self.assertEqual(interval.start, time_zero) self.assertEqual(interval.end, second_time + UserActivityInterval.MIN_INTERVAL_LENGTH) third_time = time_zero + timedelta(seconds=6000) with mock.patch('zerver.views.presence.timezone_now', return_value=third_time): result = self.client_post("/json/users/me/presence", {'status': 'active', 'new_user_input': 'true'}) self.assert_json_success(result) self.assertEqual(UserActivityInterval.objects.filter(user_profile=user_profile).count(), 2) interval = UserActivityInterval.objects.filter(user_profile=user_profile).order_by('start')[0] self.assertEqual(interval.start, time_zero) self.assertEqual(interval.end, second_time + UserActivityInterval.MIN_INTERVAL_LENGTH) interval = UserActivityInterval.objects.filter(user_profile=user_profile).order_by('start')[1] self.assertEqual(interval.start, third_time) self.assertEqual(interval.end, third_time + UserActivityInterval.MIN_INTERVAL_LENGTH) self.assertEqual( seconds_usage_between( user_profile, time_zero, third_time).total_seconds(), 1500) self.assertEqual( seconds_usage_between( user_profile, time_zero, third_time+timedelta(seconds=10)).total_seconds(), 1510) self.assertEqual( seconds_usage_between( user_profile, time_zero, third_time+timedelta(seconds=1000)).total_seconds(), 2400) self.assertEqual( seconds_usage_between( user_profile, time_zero, third_time - timedelta(seconds=100)).total_seconds(), 1500) self.assertEqual( seconds_usage_between( user_profile, time_zero + timedelta(seconds=100), third_time - timedelta(seconds=100)).total_seconds(), 1400) self.assertEqual( seconds_usage_between( user_profile, time_zero + timedelta(seconds=1200), third_time - timedelta(seconds=100)).total_seconds(), 300) # Now test /activity with actual data user_profile.is_staff = True user_profile.save() result = self.client_get('/activity') self.assertEqual(result.status_code, 200) def test_filter_presence_idle_user_ids(self) -> None: user_profile = self.example_user("hamlet") from zerver.lib.actions import filter_presence_idle_user_ids self.login('hamlet') self.assertEqual(filter_presence_idle_user_ids({user_profile.id}), [user_profile.id]) self.client_post("/json/users/me/presence", {'status': 'idle'}) self.assertEqual(filter_presence_idle_user_ids({user_profile.id}), [user_profile.id]) # Active presence from the mobile app doesn't count self.client_post("/json/users/me/presence", {'status': 'active'}, HTTP_USER_AGENT="ZulipMobile/1.0") self.assertEqual(filter_presence_idle_user_ids({user_profile.id}), [user_profile.id]) self.client_post("/json/users/me/presence", {'status': 'active'}) self.assertEqual(filter_presence_idle_user_ids({user_profile.id}), []) def test_no_mit(self) -> None: """Zephyr mirror realms such as MIT never get a list of users""" user = self.mit_user('espuser') self.login_user(user) result = self.client_post("/json/users/me/presence", {'status': 'idle'}, subdomain="zephyr") self.assert_json_success(result) self.assertEqual(result.json()['presences'], {}) def test_mirror_presence(self) -> None: """Zephyr mirror realms find out the status of their mirror bot""" user_profile = self.mit_user('espuser') self.login_user(user_profile) def post_presence() -> Dict[str, Any]: result = self.client_post("/json/users/me/presence", {'status': 'idle'}, subdomain="zephyr") self.assert_json_success(result) json = result.json() return json json = post_presence() self.assertEqual(json['zephyr_mirror_active'], False) self._simulate_mirror_activity_for_user(user_profile) json = post_presence() self.assertEqual(json['zephyr_mirror_active'], True) def _simulate_mirror_activity_for_user(self, user_profile: UserProfile) -> None: last_visit = timezone_now() client = make_client('zephyr_mirror') UserActivity.objects.get_or_create( user_profile=user_profile, client=client, query='get_events', count=2, last_visit=last_visit, ) def test_same_realm(self) -> None: espuser = self.mit_user('espuser') self.login_user(espuser) self.client_post("/json/users/me/presence", {'status': 'idle'}, subdomain="zephyr") self.logout() # Ensure we don't see hamlet@zulip.com information leakage hamlet = self.example_user('hamlet') self.login_user(hamlet) result = self.client_post("/json/users/me/presence", {'status': 'idle'}) self.assert_json_success(result) json = result.json() self.assertEqual(json['presences'][hamlet.email]["website"]['status'], 'idle') self.assertEqual( json['presences'].keys(), {hamlet.email}, ) class SingleUserPresenceTests(ZulipTestCase): def test_email_access(self) -> None: user = self.example_user('hamlet') self.login_user(user) other_user = self.example_user('othello') other_user.email = 'email@zulip.com' other_user.delivery_email = 'delivery_email@zulip.com' other_user.save() # Note that we don't leak any info on delivery emails. result = self.client_get('/json/users/delivery_email@zulip.com/presence') self.assert_json_error(result, 'No such user') result = self.client_get('/json/users/not_even_in_realm@zulip.com/presence') self.assert_json_error(result, 'No such user') # For a known email, we may simply complain about lack of presence info. result = self.client_get("/json/users/email@zulip.com/presence") self.assert_json_error(result, 'No presence data for email@zulip.com') def test_single_user_get(self) -> None: reset_emails_in_zulip_realm() # First, we setup the test with some data user = self.example_user("othello") self.login_user(user) result = self.client_post("/json/users/me/presence", {'status': 'active'}) result = self.client_post("/json/users/me/presence", {'status': 'active'}, HTTP_USER_AGENT="ZulipDesktop/1.0") result = self.api_post(user, "/api/v1/users/me/presence", {'status': 'idle'}, HTTP_USER_AGENT="ZulipAndroid/1.0") self.assert_json_success(result) # Check some error conditions result = self.client_get("/json/users/nonexistence@zulip.com/presence") self.assert_json_error(result, "No such user") result = self.client_get("/json/users/cordelia@zulip.com/presence") self.assert_json_error(result, "No presence data for cordelia@zulip.com") do_deactivate_user(self.example_user('cordelia')) result = self.client_get("/json/users/cordelia@zulip.com/presence") self.assert_json_error(result, "No such user") result = self.client_get("/json/users/default-bot@zulip.com/presence") self.assert_json_error(result, "Presence is not supported for bot users.") sipbtest = self.mit_user('sipbtest') self.login_user(sipbtest) result = self.client_get("/json/users/othello@zulip.com/presence", subdomain="zephyr") self.assert_json_error(result, "No such user") # Then, we check everything works self.login('hamlet') result = self.client_get("/json/users/othello@zulip.com/presence") result_dict = result.json() self.assertEqual( set(result_dict['presence'].keys()), {"ZulipAndroid", "website", "aggregated"}) self.assertEqual(set(result_dict['presence']['website'].keys()), {"status", "timestamp"}) def test_ping_only(self) -> None: self.login('othello') req = dict( status='active', ping_only='true', ) result = self.client_post("/json/users/me/presence", req) self.assertEqual(result.json()['msg'], '') class UserPresenceAggregationTests(ZulipTestCase): def _send_presence_for_aggregated_tests(self, user: UserProfile, status: str, validate_time: datetime.datetime) -> Dict[str, Dict[str, Any]]: self.login_user(user) timezone_util = 'zerver.views.presence.timezone_now' with mock.patch(timezone_util, return_value=validate_time - datetime.timedelta(seconds=5)): self.client_post("/json/users/me/presence", {'status': status}) with mock.patch(timezone_util, return_value=validate_time - datetime.timedelta(seconds=2)): self.api_post(user, "/api/v1/users/me/presence", {'status': status}, HTTP_USER_AGENT="ZulipAndroid/1.0") with mock.patch(timezone_util, return_value=validate_time - datetime.timedelta(seconds=7)): latest_result = self.api_post(user, "/api/v1/users/me/presence", {'status': status}, HTTP_USER_AGENT="ZulipIOS/1.0") latest_result_dict = latest_result.json() self.assertDictEqual( latest_result_dict['presences'][user.email]['aggregated'], { 'status': status, 'timestamp': datetime_to_timestamp(validate_time - datetime.timedelta(seconds=2)), 'client': 'ZulipAndroid', }, ) result = self.client_get(f"/json/users/{user.email}/presence") return result.json() def test_aggregated_info(self) -> None: user = self.example_user("othello") validate_time = timezone_now() self._send_presence_for_aggregated_tests(user, 'active', validate_time) with mock.patch('zerver.views.presence.timezone_now', return_value=validate_time - datetime.timedelta(seconds=1)): result = self.api_post(user, "/api/v1/users/me/presence", {'status': 'active'}, HTTP_USER_AGENT="ZulipTestDev/1.0") result_dict = result.json() self.assertDictEqual( result_dict['presences'][user.email]['aggregated'], { 'status': 'active', 'timestamp': datetime_to_timestamp(validate_time - datetime.timedelta(seconds=1)), 'client': 'ZulipTestDev', }, ) def test_aggregated_presense_active(self) -> None: user = self.example_user('othello') validate_time = timezone_now() result_dict = self._send_presence_for_aggregated_tests(user, 'active', validate_time) self.assertDictEqual( result_dict['presence']['aggregated'], { "status": "active", "timestamp": datetime_to_timestamp(validate_time - datetime.timedelta(seconds=2)), }, ) def test_aggregated_presense_idle(self) -> None: user = self.example_user('othello') validate_time = timezone_now() result_dict = self._send_presence_for_aggregated_tests(user, 'idle', validate_time) self.assertDictEqual( result_dict['presence']['aggregated'], { "status": "idle", "timestamp": datetime_to_timestamp(validate_time - datetime.timedelta(seconds=2)), }, ) def test_aggregated_presense_mixed(self) -> None: user = self.example_user("othello") self.login_user(user) validate_time = timezone_now() with mock.patch('zerver.views.presence.timezone_now', return_value=validate_time - datetime.timedelta(seconds=3)): self.api_post(user, "/api/v1/users/me/presence", {'status': 'active'}, HTTP_USER_AGENT="ZulipTestDev/1.0") result_dict = self._send_presence_for_aggregated_tests(user, 'idle', validate_time) self.assertDictEqual( result_dict['presence']['aggregated'], { "status": "idle", "timestamp": datetime_to_timestamp(validate_time - datetime.timedelta(seconds=2)), }, ) def test_aggregated_presense_offline(self) -> None: user = self.example_user("othello") self.login_user(user) validate_time = timezone_now() with self.settings(OFFLINE_THRESHOLD_SECS=1): result_dict = self._send_presence_for_aggregated_tests(user, 'idle', validate_time) self.assertDictEqual( result_dict['presence']['aggregated'], { "status": "offline", "timestamp": datetime_to_timestamp(validate_time - datetime.timedelta(seconds=2)), }, ) class GetRealmStatusesTest(ZulipTestCase): def test_get_statuses(self) -> None: # Setup the test by simulating users reporting their presence data. othello = self.example_user("othello") hamlet = self.example_user("hamlet") result = self.api_post(othello, "/api/v1/users/me/presence", dict(status='active'), HTTP_USER_AGENT="ZulipAndroid/1.0") result = self.api_post(hamlet, "/api/v1/users/me/presence", dict(status='idle'), HTTP_USER_AGENT="ZulipDesktop/1.0") self.assert_json_success(result) json = result.json() self.assertEqual(set(json['presences'].keys()), {hamlet.email, othello.email}) result = self.api_post(hamlet, "/api/v1/users/me/presence", dict(status='active', slim_presence='true'), HTTP_USER_AGENT="ZulipDesktop/1.0") self.assert_json_success(result) json = result.json() self.assertEqual(set(json['presences'].keys()), {str(hamlet.id), str(othello.id)}) # Check that a bot can fetch the presence data for the realm. result = self.api_get(self.example_user("default_bot"), "/api/v1/realm/presence") self.assert_json_success(result) json = result.json() self.assertEqual(set(json['presences'].keys()), {hamlet.email, othello.email}) def test_presence_disabled(self) -> None: # Disable presence status and test whether the presence # is reported or not. othello = self.example_user("othello") hamlet = self.example_user("hamlet") othello.presence_enabled = False hamlet.presence_enabled = True othello.save(update_fields=['presence_enabled']) hamlet.save(update_fields=['presence_enabled']) result = self.api_post(othello, "/api/v1/users/me/presence", dict(status='active'), HTTP_USER_AGENT="ZulipAndroid/1.0") result = self.api_post(hamlet, "/api/v1/users/me/presence", dict(status='idle'), HTTP_USER_AGENT="ZulipDesktop/1.0") self.assert_json_success(result) json = result.json() # Othello's presence status is disabled so it won't be reported. self.assertEqual(set(json['presences'].keys()), {hamlet.email}) result = self.api_post(hamlet, "/api/v1/users/me/presence", dict(status='active', slim_presence='true'), HTTP_USER_AGENT="ZulipDesktop/1.0") self.assert_json_success(result) json = result.json() self.assertEqual(set(json['presences'].keys()), {str(hamlet.id)})