import datetime import os import re from datetime import timedelta from typing import Any, Dict, List, Union from unittest import mock import orjson from django.conf import settings from django.utils.timezone import now as timezone_now from confirmation.models import Confirmation, create_confirmation_link from zerver.actions.create_realm import do_change_realm_subdomain, do_create_realm from zerver.actions.message_send import ( internal_send_huddle_message, internal_send_private_message, internal_send_stream_message, ) from zerver.actions.realm_settings import ( do_add_deactivated_redirect, do_change_realm_org_type, do_change_realm_plan_type, do_deactivate_realm, do_delete_all_realm_attachments, do_reactivate_realm, do_scrub_realm, do_send_realm_reactivation_email, do_set_realm_property, do_set_realm_user_default_setting, ) from zerver.actions.streams import do_deactivate_stream, merge_streams from zerver.lib.realm_description import get_realm_rendered_description, get_realm_text_description from zerver.lib.send_email import send_future_email from zerver.lib.streams import create_stream_if_needed from zerver.lib.test_classes import ZulipTestCase from zerver.lib.upload import delete_message_attachments, upload_message_attachment from zerver.models import ( Attachment, CustomProfileField, Message, Realm, RealmAuditLog, RealmReactivationStatus, RealmUserDefault, ScheduledEmail, Stream, UserGroup, UserGroupMembership, UserMessage, UserProfile, get_realm, get_stream, get_system_bot, get_user_profile_by_id, ) class RealmTest(ZulipTestCase): def assert_user_profile_cache_gets_new_name( self, user_profile: UserProfile, new_realm_name: str ) -> None: self.assertEqual(user_profile.realm.name, new_realm_name) def test_realm_creation_ensures_internal_realms(self) -> None: with mock.patch("zerver.actions.create_realm.server_initialized", return_value=False): with mock.patch( "zerver.actions.create_realm.create_internal_realm" ) as mock_create_internal, self.assertLogs(level="INFO") as info_logs: do_create_realm("testrealm", "Test Realm") mock_create_internal.assert_called_once() self.assertEqual( info_logs.output, ["INFO:root:Server not yet initialized. Creating the internal realm first."], ) def test_realm_creation_on_social_auth_subdomain_disallowed(self) -> None: with self.settings(SOCIAL_AUTH_SUBDOMAIN="zulipauth"): with self.assertRaises(AssertionError): do_create_realm("zulipauth", "Test Realm") def test_permission_for_education_non_profit_organization(self) -> None: realm = do_create_realm( "test_education_non_profit", "education_org_name", org_type=Realm.ORG_TYPES["education_nonprofit"]["id"], ) self.assertEqual(realm.create_public_stream_policy, Realm.POLICY_ADMINS_ONLY) self.assertEqual(realm.create_private_stream_policy, Realm.POLICY_MEMBERS_ONLY) self.assertEqual(realm.invite_to_realm_policy, Realm.POLICY_ADMINS_ONLY) self.assertEqual(realm.move_messages_between_streams_policy, Realm.POLICY_MODERATORS_ONLY) self.assertEqual(realm.user_group_edit_policy, Realm.POLICY_MODERATORS_ONLY) self.assertEqual(realm.invite_to_stream_policy, Realm.POLICY_MODERATORS_ONLY) def test_permission_for_education_for_profit_organization(self) -> None: realm = do_create_realm( "test_education_for_profit", "education_org_name", org_type=Realm.ORG_TYPES["education"]["id"], ) self.assertEqual(realm.create_public_stream_policy, Realm.POLICY_ADMINS_ONLY) self.assertEqual(realm.create_private_stream_policy, Realm.POLICY_MEMBERS_ONLY) self.assertEqual(realm.invite_to_realm_policy, Realm.POLICY_ADMINS_ONLY) self.assertEqual(realm.move_messages_between_streams_policy, Realm.POLICY_MODERATORS_ONLY) self.assertEqual(realm.user_group_edit_policy, Realm.POLICY_MODERATORS_ONLY) self.assertEqual(realm.invite_to_stream_policy, Realm.POLICY_MODERATORS_ONLY) def test_realm_enable_spectator_access(self) -> None: realm = do_create_realm( "test_web_public_true", "Foo", plan_type=Realm.PLAN_TYPE_STANDARD, enable_spectator_access=True, ) self.assertEqual(realm.enable_spectator_access, True) realm = do_create_realm("test_web_public_false", "Boo", enable_spectator_access=False) self.assertEqual(realm.enable_spectator_access, False) with self.assertRaises(AssertionError): realm = do_create_realm("test_web_public_false_1", "Foo", enable_spectator_access=True) with self.assertRaises(AssertionError): realm = do_create_realm( "test_web_public_false_2", "Foo", plan_type=Realm.PLAN_TYPE_LIMITED, enable_spectator_access=True, ) def test_do_set_realm_name_caching(self) -> None: """The main complicated thing about setting realm names is fighting the cache, and we start by populating the cache for Hamlet, and we end by checking the cache to ensure that the new value is there.""" realm = get_realm("zulip") new_name = "Zed You Elle Eye Pea" do_set_realm_property(realm, "name", new_name, acting_user=None) self.assertEqual(get_realm(realm.string_id).name, new_name) self.assert_user_profile_cache_gets_new_name(self.example_user("hamlet"), new_name) def test_update_realm_name_events(self) -> None: realm = get_realm("zulip") new_name = "Puliz" with self.capture_send_event_calls(expected_num_events=1) as events: do_set_realm_property(realm, "name", new_name, acting_user=None) event = events[0]["event"] self.assertEqual( event, dict( type="realm", op="update", property="name", value=new_name, ), ) def test_update_realm_description_events(self) -> None: realm = get_realm("zulip") new_description = "zulip dev group" with self.capture_send_event_calls(expected_num_events=1) as events: do_set_realm_property(realm, "description", new_description, acting_user=None) event = events[0]["event"] self.assertEqual( event, dict( type="realm", op="update", property="description", value=new_description, ), ) def test_update_realm_description(self) -> None: self.login("iago") new_description = "zulip dev group" data = dict(description=new_description) with self.capture_send_event_calls(expected_num_events=1) as events: result = self.client_patch("/json/realm", data) self.assert_json_success(result) realm = get_realm("zulip") self.assertEqual(realm.description, new_description) event = events[0]["event"] self.assertEqual( event, dict( type="realm", op="update", property="description", value=new_description, ), ) def test_realm_description_length(self) -> None: new_description = "A" * 1001 data = dict(description=new_description) # create an admin user self.login("iago") result = self.client_patch("/json/realm", data) self.assert_json_error(result, "description is too long (limit: 1000 characters)") realm = get_realm("zulip") self.assertNotEqual(realm.description, new_description) def test_realm_convert_demo_realm(self) -> None: data = dict(string_id="coolrealm") self.login("iago") result = self.client_patch("/json/realm", data) self.assert_json_error(result, "Must be an organization owner") self.login("desdemona") result = self.client_patch("/json/realm", data) self.assert_json_error(result, "Must be a demo organization.") data = dict(string_id="lear") self.login("desdemona") realm = get_realm("zulip") realm.demo_organization_scheduled_deletion_date = timezone_now() + datetime.timedelta( days=30 ) realm.save() result = self.client_patch("/json/realm", data) self.assert_json_error(result, "Subdomain unavailable. Please choose a different one.") # Now try to change the string_id to something available. data = dict(string_id="coolrealm") result = self.client_patch("/json/realm", data) self.assert_json_success(result) json = orjson.loads(result.content) self.assertEqual(json["realm_uri"], "http://coolrealm.testserver") realm = get_realm("coolrealm") self.assertIsNone(realm.demo_organization_scheduled_deletion_date) self.assertEqual(realm.string_id, data["string_id"]) def test_realm_name_length(self) -> None: new_name = "A" * (Realm.MAX_REALM_NAME_LENGTH + 1) data = dict(name=new_name) # create an admin user self.login("iago") result = self.client_patch("/json/realm", data) self.assert_json_error(result, "name is too long (limit: 40 characters)") realm = get_realm("zulip") self.assertNotEqual(realm.name, new_name) def test_admin_restrictions_for_changing_realm_name(self) -> None: new_name = "Mice will play while the cat is away" self.login("othello") req = dict(name=new_name) result = self.client_patch("/json/realm", req) self.assert_json_error(result, "Must be an organization administrator") def test_unauthorized_name_change(self) -> None: data = {"full_name": "Sir Hamlet"} user_profile = self.example_user("hamlet") self.login_user(user_profile) do_set_realm_property(user_profile.realm, "name_changes_disabled", True, acting_user=None) url = "/json/settings" result = self.client_patch(url, data) self.assertEqual(result.status_code, 200) # Since the setting fails silently, no message is returned self.assert_in_response("", result) # Realm admins can change their name even setting is disabled. data = {"full_name": "New Iago"} self.login("iago") url = "/json/settings" result = self.client_patch(url, data) self.assert_json_success(result) def test_do_deactivate_realm_clears_user_realm_cache(self) -> None: """The main complicated thing about deactivating realm names is updating the cache, and we start by populating the cache for Hamlet, and we end by checking the cache to ensure that his realm appears to be deactivated. You can make this test fail by disabling cache.flush_realm().""" hamlet_id = self.example_user("hamlet").id get_user_profile_by_id(hamlet_id) realm = get_realm("zulip") do_deactivate_realm(realm, acting_user=None) user = get_user_profile_by_id(hamlet_id) self.assertTrue(user.realm.deactivated) def test_do_change_realm_delete_clears_user_realm_cache(self) -> None: hamlet_id = self.example_user("hamlet").id get_user_profile_by_id(hamlet_id) realm = get_realm("zulip") realm.delete() with self.assertRaises(UserProfile.DoesNotExist): get_user_profile_by_id(hamlet_id) def test_do_change_realm_subdomain_clears_user_realm_cache(self) -> None: """The main complicated thing about changing realm subdomains is updating the cache, and we start by populating the cache for Hamlet, and we end by checking the cache to ensure that his realm appears to be deactivated. You can make this test fail by disabling cache.flush_realm().""" hamlet_id = self.example_user("hamlet").id user = get_user_profile_by_id(hamlet_id) realm = get_realm("zulip") iago = self.example_user("iago") do_change_realm_subdomain(realm, "newzulip", acting_user=iago) user = get_user_profile_by_id(hamlet_id) self.assertEqual(user.realm.string_id, "newzulip") placeholder_realm = get_realm("zulip") self.assertTrue(placeholder_realm.deactivated) self.assertEqual(placeholder_realm.deactivated_redirect, user.realm.uri) realm_audit_log = RealmAuditLog.objects.filter( event_type=RealmAuditLog.REALM_SUBDOMAIN_CHANGED, acting_user=iago ).last() assert realm_audit_log is not None expected_extra_data = {"old_subdomain": "zulip", "new_subdomain": "newzulip"} self.assertEqual(realm_audit_log.extra_data, str(expected_extra_data)) self.assertEqual(realm_audit_log.acting_user, iago) def test_do_deactivate_realm_clears_scheduled_jobs(self) -> None: user = self.example_user("hamlet") send_future_email( "zerver/emails/followup_day1", user.realm, to_user_ids=[user.id], delay=datetime.timedelta(hours=1), ) self.assertEqual(ScheduledEmail.objects.count(), 1) do_deactivate_realm(user.realm, acting_user=None) self.assertEqual(ScheduledEmail.objects.count(), 0) def test_do_change_realm_description_clears_cached_descriptions(self) -> None: realm = get_realm("zulip") rendered_description = get_realm_rendered_description(realm) text_description = get_realm_text_description(realm) realm.description = "New description" realm.save(update_fields=["description"]) new_rendered_description = get_realm_rendered_description(realm) self.assertNotEqual(rendered_description, new_rendered_description) self.assertIn(realm.description, new_rendered_description) new_text_description = get_realm_text_description(realm) self.assertNotEqual(text_description, new_text_description) self.assertEqual(realm.description, new_text_description) def test_do_deactivate_realm_on_deactivated_realm(self) -> None: """Ensure early exit is working in realm deactivation""" realm = get_realm("zulip") self.assertFalse(realm.deactivated) do_deactivate_realm(realm, acting_user=None) self.assertTrue(realm.deactivated) do_deactivate_realm(realm, acting_user=None) self.assertTrue(realm.deactivated) def test_do_set_deactivated_redirect_on_deactivated_realm(self) -> None: """Ensure that the redirect url is working when deactivating realm""" realm = get_realm("zulip") redirect_url = "new_server.zulip.com" do_deactivate_realm(realm, acting_user=None) self.assertTrue(realm.deactivated) do_add_deactivated_redirect(realm, redirect_url) self.assertEqual(realm.deactivated_redirect, redirect_url) new_redirect_url = "test.zulip.com" do_add_deactivated_redirect(realm, new_redirect_url) self.assertEqual(realm.deactivated_redirect, new_redirect_url) self.assertNotEqual(realm.deactivated_redirect, redirect_url) def test_do_reactivate_realm(self) -> None: realm = get_realm("zulip") do_deactivate_realm(realm, acting_user=None) self.assertTrue(realm.deactivated) do_reactivate_realm(realm) self.assertFalse(realm.deactivated) log_entry = RealmAuditLog.objects.last() assert log_entry is not None self.assertEqual(log_entry.realm, realm) self.assertEqual(log_entry.event_type, RealmAuditLog.REALM_REACTIVATED) log_entry_id = log_entry.id with self.assertLogs(level="WARNING") as m: # do_reactivate_realm on a realm that's not deactivated should be a noop. do_reactivate_realm(realm) self.assertEqual( m.output, [f"WARNING:root:Realm {realm.id} cannot be reactivated because it is already active."], ) self.assertFalse(realm.deactivated) latest_log_entry = RealmAuditLog.objects.last() assert latest_log_entry is not None self.assertEqual(latest_log_entry.id, log_entry_id) def test_realm_reactivation_link(self) -> None: realm = get_realm("zulip") do_deactivate_realm(realm, acting_user=None) self.assertTrue(realm.deactivated) obj = RealmReactivationStatus.objects.create(realm=realm) confirmation_url = create_confirmation_link(obj, Confirmation.REALM_REACTIVATION) response = self.client_get(confirmation_url) self.assert_in_success_response( ["Your organization has been successfully reactivated"], response ) realm = get_realm("zulip") self.assertFalse(realm.deactivated) # Make sure the link can't be reused. do_deactivate_realm(realm, acting_user=None) response = self.client_get(confirmation_url) self.assertEqual(response.status_code, 404) def test_realm_reactivation_confirmation_object(self) -> None: realm = get_realm("zulip") do_deactivate_realm(realm, acting_user=None) self.assertTrue(realm.deactivated) obj = RealmReactivationStatus.objects.create(realm=realm) create_confirmation_link(obj, Confirmation.REALM_REACTIVATION) confirmation = Confirmation.objects.last() assert confirmation is not None self.assertEqual(confirmation.content_object, obj) self.assertEqual(confirmation.realm, realm) def test_do_send_realm_reactivation_email(self) -> None: realm = get_realm("zulip") do_deactivate_realm(realm, acting_user=None) self.assertEqual(realm.deactivated, True) iago = self.example_user("iago") do_send_realm_reactivation_email(realm, acting_user=iago) from django.core.mail import outbox self.assert_length(outbox, 1) self.assertEqual(self.email_envelope_from(outbox[0]), settings.NOREPLY_EMAIL_ADDRESS) self.assertRegex( self.email_display_from(outbox[0]), rf"^Zulip Account Security <{self.TOKENIZED_NOREPLY_REGEX}>\Z", ) self.assertIn("Reactivate your Zulip organization", outbox[0].subject) self.assertIn("Dear former administrators", outbox[0].body) admins = realm.get_human_admin_users() confirmation_url = self.get_confirmation_url_from_outbox(admins[0].delivery_email) response = self.client_get(confirmation_url) self.assert_in_success_response( ["Your organization has been successfully reactivated"], response ) realm = get_realm("zulip") self.assertFalse(realm.deactivated) self.assertEqual( RealmAuditLog.objects.filter( event_type=RealmAuditLog.REALM_REACTIVATION_EMAIL_SENT, acting_user=iago ).count(), 1, ) def test_realm_reactivation_with_random_link(self) -> None: random_link = "/reactivate/5e89081eb13984e0f3b130bf7a4121d153f1614b" response = self.client_get(random_link) self.assertEqual(response.status_code, 404) self.assert_in_response( "The organization reactivation link has expired or is not valid.", response ) def test_change_notifications_stream(self) -> None: # We need an admin user. self.login("iago") disabled_notif_stream_id = -1 req = dict(notifications_stream_id=orjson.dumps(disabled_notif_stream_id).decode()) result = self.client_patch("/json/realm", req) self.assert_json_success(result) realm = get_realm("zulip") self.assertEqual(realm.notifications_stream, None) new_notif_stream_id = Stream.objects.get(name="Denmark").id req = dict(notifications_stream_id=orjson.dumps(new_notif_stream_id).decode()) result = self.client_patch("/json/realm", req) self.assert_json_success(result) realm = get_realm("zulip") assert realm.notifications_stream is not None self.assertEqual(realm.notifications_stream.id, new_notif_stream_id) invalid_notif_stream_id = 1234 req = dict(notifications_stream_id=orjson.dumps(invalid_notif_stream_id).decode()) result = self.client_patch("/json/realm", req) self.assert_json_error(result, "Invalid stream ID") realm = get_realm("zulip") assert realm.notifications_stream is not None self.assertNotEqual(realm.notifications_stream.id, invalid_notif_stream_id) def test_get_default_notifications_stream(self) -> None: realm = get_realm("zulip") verona = get_stream("verona", realm) notifications_stream = realm.get_notifications_stream() assert notifications_stream is not None self.assertEqual(notifications_stream.id, verona.id) do_deactivate_stream(notifications_stream, acting_user=None) self.assertIsNone(realm.get_notifications_stream()) def test_merge_streams(self) -> None: realm = get_realm("zulip") denmark = get_stream("Denmark", realm) cordelia = self.example_user("cordelia") notifications_stream = realm.get_notifications_stream() assert notifications_stream is not None create_stream_if_needed(realm, "Atlantis") self.subscribe(cordelia, "Atlantis") self.send_stream_message(cordelia, "Atlantis") atlantis = get_stream("Atlantis", realm) stats = merge_streams(realm, denmark, denmark) self.assertEqual(stats, (0, 0, 0)) stats = merge_streams(realm, denmark, atlantis) self.assertEqual(stats, (1, 1, 1)) with self.assertRaises(Stream.DoesNotExist): get_stream("Atlantis", realm) stats = merge_streams(realm, denmark, notifications_stream) self.assertEqual(stats, (2, 1, 10)) self.assertIsNone(realm.get_notifications_stream()) def test_change_signup_notifications_stream(self) -> None: # We need an admin user. self.login("iago") disabled_signup_notifications_stream_id = -1 req = dict( signup_notifications_stream_id=orjson.dumps( disabled_signup_notifications_stream_id ).decode() ) result = self.client_patch("/json/realm", req) self.assert_json_success(result) realm = get_realm("zulip") self.assertEqual(realm.signup_notifications_stream, None) new_signup_notifications_stream_id = Stream.objects.get(name="Denmark").id req = dict( signup_notifications_stream_id=orjson.dumps(new_signup_notifications_stream_id).decode() ) result = self.client_patch("/json/realm", req) self.assert_json_success(result) realm = get_realm("zulip") assert realm.signup_notifications_stream is not None self.assertEqual(realm.signup_notifications_stream.id, new_signup_notifications_stream_id) invalid_signup_notifications_stream_id = 1234 req = dict( signup_notifications_stream_id=orjson.dumps( invalid_signup_notifications_stream_id ).decode() ) result = self.client_patch("/json/realm", req) self.assert_json_error(result, "Invalid stream ID") realm = get_realm("zulip") assert realm.signup_notifications_stream is not None self.assertNotEqual( realm.signup_notifications_stream.id, invalid_signup_notifications_stream_id ) def test_get_default_signup_notifications_stream(self) -> None: realm = get_realm("zulip") verona = get_stream("verona", realm) realm.signup_notifications_stream = verona realm.save(update_fields=["signup_notifications_stream"]) signup_notifications_stream = realm.get_signup_notifications_stream() assert signup_notifications_stream is not None self.assertEqual(signup_notifications_stream, verona) do_deactivate_stream(signup_notifications_stream, acting_user=None) self.assertIsNone(realm.get_signup_notifications_stream()) def test_change_realm_default_language(self) -> None: # we need an admin user. self.login("iago") # Test to make sure that when invalid languages are passed # as the default realm language, correct validation error is # raised and the invalid language is not saved in db invalid_lang = "invalid_lang" req = dict(default_language=invalid_lang) result = self.client_patch("/json/realm", req) self.assert_json_error(result, f"Invalid language '{invalid_lang}'") realm = get_realm("zulip") self.assertNotEqual(realm.default_language, invalid_lang) def test_deactivate_realm_by_owner(self) -> None: self.login("desdemona") realm = get_realm("zulip") self.assertFalse(realm.deactivated) result = self.client_post("/json/realm/deactivate") self.assert_json_success(result) realm = get_realm("zulip") self.assertTrue(realm.deactivated) def test_deactivate_realm_by_non_owner(self) -> None: self.login("iago") realm = get_realm("zulip") self.assertFalse(realm.deactivated) result = self.client_post("/json/realm/deactivate") self.assert_json_error(result, "Must be an organization owner") realm = get_realm("zulip") self.assertFalse(realm.deactivated) def test_invalid_integer_attribute_values(self) -> None: integer_values = [key for key, value in Realm.property_types.items() if value is int] invalid_values = dict( bot_creation_policy=10, create_public_stream_policy=10, create_private_stream_policy=10, create_web_public_stream_policy=10, invite_to_stream_policy=10, message_retention_days=10, video_chat_provider=10, giphy_rating=10, waiting_period_threshold=-10, digest_weekday=10, user_group_edit_policy=10, private_message_policy=10, message_content_delete_limit_seconds=-10, wildcard_mention_policy=10, invite_to_realm_policy=10, move_messages_between_streams_policy=10, add_custom_emoji_policy=10, delete_own_message_policy=10, edit_topic_policy=10, message_content_edit_limit_seconds=0, move_messages_within_stream_limit_seconds=0, move_messages_between_streams_limit_seconds=0, ) # We need an admin user. self.login("iago") for name in integer_values: invalid_value = invalid_values.get(name) if invalid_value is None: raise AssertionError(f"No test created for {name}") self.do_test_invalid_integer_attribute_value(name, invalid_value) def do_test_invalid_integer_attribute_value(self, val_name: str, invalid_val: int) -> None: possible_messages = { f"Invalid {val_name}", f"Bad value for '{val_name}'", f"Bad value for '{val_name}': {invalid_val}", f"Invalid {val_name} {invalid_val}", } req = {val_name: invalid_val} result = self.client_patch("/json/realm", req) msg = self.get_json_error(result) self.assertTrue(msg in possible_messages) def test_change_video_chat_provider(self) -> None: self.assertEqual( get_realm("zulip").video_chat_provider, Realm.VIDEO_CHAT_PROVIDERS["jitsi_meet"]["id"] ) self.login("iago") invalid_video_chat_provider_value = 10 req = {"video_chat_provider": orjson.dumps(invalid_video_chat_provider_value).decode()} result = self.client_patch("/json/realm", req) self.assert_json_error( result, f"Invalid video_chat_provider {invalid_video_chat_provider_value}" ) req = { "video_chat_provider": orjson.dumps( Realm.VIDEO_CHAT_PROVIDERS["disabled"]["id"] ).decode() } result = self.client_patch("/json/realm", req) self.assert_json_success(result) self.assertEqual( get_realm("zulip").video_chat_provider, Realm.VIDEO_CHAT_PROVIDERS["disabled"]["id"] ) req = { "video_chat_provider": orjson.dumps( Realm.VIDEO_CHAT_PROVIDERS["jitsi_meet"]["id"] ).decode() } result = self.client_patch("/json/realm", req) self.assert_json_success(result) self.assertEqual( get_realm("zulip").video_chat_provider, Realm.VIDEO_CHAT_PROVIDERS["jitsi_meet"]["id"] ) req = { "video_chat_provider": orjson.dumps( Realm.VIDEO_CHAT_PROVIDERS["big_blue_button"]["id"] ).decode() } result = self.client_patch("/json/realm", req) self.assert_json_success(result) self.assertEqual( get_realm("zulip").video_chat_provider, Realm.VIDEO_CHAT_PROVIDERS["big_blue_button"]["id"], ) req = { "video_chat_provider": orjson.dumps(Realm.VIDEO_CHAT_PROVIDERS["zoom"]["id"]).decode() } result = self.client_patch("/json/realm", req) self.assert_json_success(result) def test_initial_plan_type(self) -> None: with self.settings(BILLING_ENABLED=True): self.assertEqual(do_create_realm("hosted", "hosted").plan_type, Realm.PLAN_TYPE_LIMITED) self.assertEqual( get_realm("hosted").max_invites, settings.INVITES_DEFAULT_REALM_DAILY_MAX ) self.assertEqual( get_realm("hosted").message_visibility_limit, Realm.MESSAGE_VISIBILITY_LIMITED ) self.assertEqual(get_realm("hosted").upload_quota_gb, Realm.UPLOAD_QUOTA_LIMITED) with self.settings(BILLING_ENABLED=False): self.assertEqual( do_create_realm("onpremise", "onpremise").plan_type, Realm.PLAN_TYPE_SELF_HOSTED ) self.assertEqual( get_realm("onpremise").max_invites, settings.INVITES_DEFAULT_REALM_DAILY_MAX ) self.assertEqual(get_realm("onpremise").message_visibility_limit, None) self.assertEqual(get_realm("onpremise").upload_quota_gb, None) def test_change_org_type(self) -> None: realm = get_realm("zulip") iago = self.example_user("iago") self.assertEqual(realm.org_type, Realm.ORG_TYPES["business"]["id"]) do_change_realm_org_type(realm, Realm.ORG_TYPES["government"]["id"], acting_user=iago) realm = get_realm("zulip") realm_audit_log = RealmAuditLog.objects.filter( event_type=RealmAuditLog.REALM_ORG_TYPE_CHANGED ).last() assert realm_audit_log is not None expected_extra_data = { "old_value": Realm.ORG_TYPES["business"]["id"], "new_value": Realm.ORG_TYPES["government"]["id"], } self.assertEqual(realm_audit_log.extra_data, str(expected_extra_data)) self.assertEqual(realm_audit_log.acting_user, iago) self.assertEqual(realm.org_type, Realm.ORG_TYPES["government"]["id"]) def test_change_realm_plan_type(self) -> None: realm = get_realm("zulip") iago = self.example_user("iago") self.assertEqual(realm.plan_type, Realm.PLAN_TYPE_SELF_HOSTED) self.assertEqual(realm.max_invites, settings.INVITES_DEFAULT_REALM_DAILY_MAX) self.assertEqual(realm.message_visibility_limit, None) self.assertEqual(realm.upload_quota_gb, None) do_change_realm_plan_type(realm, Realm.PLAN_TYPE_STANDARD, acting_user=iago) realm = get_realm("zulip") realm_audit_log = RealmAuditLog.objects.filter( event_type=RealmAuditLog.REALM_PLAN_TYPE_CHANGED ).last() assert realm_audit_log is not None expected_extra_data = { "old_value": Realm.PLAN_TYPE_SELF_HOSTED, "new_value": Realm.PLAN_TYPE_STANDARD, } self.assertEqual(realm_audit_log.extra_data, str(expected_extra_data)) self.assertEqual(realm_audit_log.acting_user, iago) self.assertEqual(realm.plan_type, Realm.PLAN_TYPE_STANDARD) self.assertEqual(realm.max_invites, Realm.INVITES_STANDARD_REALM_DAILY_MAX) self.assertEqual(realm.message_visibility_limit, None) self.assertEqual(realm.upload_quota_gb, Realm.UPLOAD_QUOTA_STANDARD) do_set_realm_property(realm, "enable_spectator_access", True, acting_user=None) do_change_realm_plan_type(realm, Realm.PLAN_TYPE_LIMITED, acting_user=iago) realm = get_realm("zulip") self.assertEqual(realm.plan_type, Realm.PLAN_TYPE_LIMITED) self.assertEqual(realm.max_invites, settings.INVITES_DEFAULT_REALM_DAILY_MAX) self.assertEqual(realm.message_visibility_limit, Realm.MESSAGE_VISIBILITY_LIMITED) self.assertEqual(realm.upload_quota_gb, Realm.UPLOAD_QUOTA_LIMITED) self.assertFalse(realm.enable_spectator_access) do_change_realm_plan_type(realm, Realm.PLAN_TYPE_STANDARD_FREE, acting_user=iago) realm = get_realm("zulip") self.assertEqual(realm.plan_type, Realm.PLAN_TYPE_STANDARD_FREE) self.assertEqual(realm.max_invites, Realm.INVITES_STANDARD_REALM_DAILY_MAX) self.assertEqual(realm.message_visibility_limit, None) self.assertEqual(realm.upload_quota_gb, Realm.UPLOAD_QUOTA_STANDARD) do_change_realm_plan_type(realm, Realm.PLAN_TYPE_LIMITED, acting_user=iago) do_change_realm_plan_type(realm, Realm.PLAN_TYPE_PLUS, acting_user=iago) realm = get_realm("zulip") self.assertEqual(realm.plan_type, Realm.PLAN_TYPE_PLUS) self.assertEqual(realm.max_invites, Realm.INVITES_STANDARD_REALM_DAILY_MAX) self.assertEqual(realm.message_visibility_limit, None) self.assertEqual(realm.upload_quota_gb, Realm.UPLOAD_QUOTA_STANDARD) do_change_realm_plan_type(realm, Realm.PLAN_TYPE_SELF_HOSTED, acting_user=iago) self.assertEqual(realm.plan_type, Realm.PLAN_TYPE_SELF_HOSTED) self.assertEqual(realm.max_invites, settings.INVITES_DEFAULT_REALM_DAILY_MAX) self.assertEqual(realm.message_visibility_limit, None) self.assertEqual(realm.upload_quota_gb, None) def test_message_retention_days(self) -> None: self.login("iago") realm = get_realm("zulip") self.assertEqual(realm.plan_type, Realm.PLAN_TYPE_SELF_HOSTED) req = dict(message_retention_days=orjson.dumps(10).decode()) result = self.client_patch("/json/realm", req) self.assert_json_error(result, "Must be an organization owner") self.login("desdemona") req = dict(message_retention_days=orjson.dumps(0).decode()) result = self.client_patch("/json/realm", req) self.assert_json_error(result, "Bad value for 'message_retention_days': 0") req = dict(message_retention_days=orjson.dumps(-10).decode()) result = self.client_patch("/json/realm", req) self.assert_json_error(result, "Bad value for 'message_retention_days': -10") req = dict(message_retention_days=orjson.dumps("invalid").decode()) result = self.client_patch("/json/realm", req) self.assert_json_error(result, "Bad value for 'message_retention_days': invalid") req = dict(message_retention_days=orjson.dumps(-1).decode()) result = self.client_patch("/json/realm", req) self.assert_json_error(result, "Bad value for 'message_retention_days': -1") req = dict(message_retention_days=orjson.dumps("unlimited").decode()) result = self.client_patch("/json/realm", req) self.assert_json_success(result) req = dict(message_retention_days=orjson.dumps(10).decode()) result = self.client_patch("/json/realm", req) self.assert_json_success(result) do_change_realm_plan_type(realm, Realm.PLAN_TYPE_LIMITED, acting_user=None) req = dict(message_retention_days=orjson.dumps(10).decode()) result = self.client_patch("/json/realm", req) self.assert_json_error(result, "Available on Zulip Cloud Standard. Upgrade to access.") do_change_realm_plan_type(realm, Realm.PLAN_TYPE_STANDARD, acting_user=None) req = dict(message_retention_days=orjson.dumps(10).decode()) result = self.client_patch("/json/realm", req) self.assert_json_success(result) def test_do_create_realm(self) -> None: realm = do_create_realm("realm_string_id", "realm name") self.assertEqual(realm.string_id, "realm_string_id") self.assertEqual(realm.name, "realm name") self.assertFalse(realm.emails_restricted_to_domains) self.assertEqual(realm.description, "") self.assertTrue(realm.invite_required) self.assertEqual(realm.plan_type, Realm.PLAN_TYPE_LIMITED) self.assertEqual(realm.org_type, Realm.ORG_TYPES["unspecified"]["id"]) self.assertEqual(type(realm.date_created), datetime.datetime) self.assertTrue( RealmAuditLog.objects.filter( realm=realm, event_type=RealmAuditLog.REALM_CREATED, event_time=realm.date_created ).exists() ) assert realm.notifications_stream is not None self.assertEqual(realm.notifications_stream.name, "general") self.assertEqual(realm.notifications_stream.realm, realm) assert realm.signup_notifications_stream is not None self.assertEqual(realm.signup_notifications_stream.name, "core team") self.assertEqual(realm.signup_notifications_stream.realm, realm) self.assertEqual(realm.plan_type, Realm.PLAN_TYPE_LIMITED) def test_do_create_realm_with_keyword_arguments(self) -> None: date_created = timezone_now() - datetime.timedelta(days=100) realm = do_create_realm( "realm_string_id", "realm name", emails_restricted_to_domains=True, date_created=date_created, description="realm description", invite_required=False, plan_type=Realm.PLAN_TYPE_STANDARD_FREE, org_type=Realm.ORG_TYPES["community"]["id"], enable_read_receipts=True, ) self.assertEqual(realm.string_id, "realm_string_id") self.assertEqual(realm.name, "realm name") self.assertTrue(realm.emails_restricted_to_domains) self.assertEqual(realm.description, "realm description") self.assertFalse(realm.invite_required) self.assertEqual(realm.plan_type, Realm.PLAN_TYPE_STANDARD_FREE) self.assertEqual(realm.org_type, Realm.ORG_TYPES["community"]["id"]) self.assertEqual(realm.date_created, date_created) self.assertEqual(realm.enable_read_receipts, True) self.assertTrue( RealmAuditLog.objects.filter( realm=realm, event_type=RealmAuditLog.REALM_CREATED, event_time=realm.date_created ).exists() ) assert realm.notifications_stream is not None self.assertEqual(realm.notifications_stream.name, "general") self.assertEqual(realm.notifications_stream.realm, realm) assert realm.signup_notifications_stream is not None self.assertEqual(realm.signup_notifications_stream.name, "core team") self.assertEqual(realm.signup_notifications_stream.realm, realm) def test_realm_is_web_public(self) -> None: realm = get_realm("zulip") # By default "Rome" is web_public in zulip realm rome = Stream.objects.get(name="Rome") self.assertEqual(rome.is_web_public, True) self.assertEqual(realm.has_web_public_streams(), True) self.assertEqual(realm.web_public_streams_enabled(), True) with self.settings(WEB_PUBLIC_STREAMS_ENABLED=False): self.assertEqual(realm.has_web_public_streams(), False) self.assertEqual(realm.web_public_streams_enabled(), False) realm.enable_spectator_access = False realm.save() self.assertEqual(realm.has_web_public_streams(), False) self.assertEqual(realm.web_public_streams_enabled(), False) realm.enable_spectator_access = True realm.save() # Convert Rome to a public stream rome.is_web_public = False rome.save() self.assertEqual(Stream.objects.filter(realm=realm, is_web_public=True).count(), 0) self.assertEqual(realm.web_public_streams_enabled(), True) self.assertEqual(realm.has_web_public_streams(), False) with self.settings(WEB_PUBLIC_STREAMS_ENABLED=False): self.assertEqual(realm.web_public_streams_enabled(), False) self.assertEqual(realm.has_web_public_streams(), False) # Restore state rome.is_web_public = True rome.save() self.assertEqual(Stream.objects.filter(realm=realm, is_web_public=True).count(), 1) self.assertEqual(realm.has_web_public_streams(), True) self.assertEqual(realm.web_public_streams_enabled(), True) with self.settings(WEB_PUBLIC_STREAMS_ENABLED=False): self.assertEqual(realm.web_public_streams_enabled(), False) self.assertEqual(realm.has_web_public_streams(), False) realm.plan_type = Realm.PLAN_TYPE_LIMITED realm.save() self.assertEqual(Stream.objects.filter(realm=realm, is_web_public=True).count(), 1) self.assertEqual(realm.web_public_streams_enabled(), False) self.assertEqual(realm.has_web_public_streams(), False) with self.settings(WEB_PUBLIC_STREAMS_ENABLED=False): self.assertEqual(realm.web_public_streams_enabled(), False) self.assertEqual(realm.has_web_public_streams(), False) def test_creating_realm_creates_system_groups(self) -> None: realm = do_create_realm("realm_string_id", "realm name") system_user_groups = UserGroup.objects.filter(realm=realm, is_system_group=True) self.assert_length(system_user_groups, 8) user_group_names = [group.name for group in system_user_groups] expected_system_group_names = [ UserGroup.OWNERS_GROUP_NAME, UserGroup.ADMINISTRATORS_GROUP_NAME, UserGroup.MODERATORS_GROUP_NAME, UserGroup.FULL_MEMBERS_GROUP_NAME, UserGroup.MEMBERS_GROUP_NAME, UserGroup.EVERYONE_GROUP_NAME, UserGroup.EVERYONE_ON_INTERNET_GROUP_NAME, UserGroup.NOBODY_GROUP_NAME, ] self.assertEqual(user_group_names.sort(), expected_system_group_names.sort()) def test_changing_waiting_period_updates_system_groups(self) -> None: realm = get_realm("zulip") members_system_group = UserGroup.objects.get( realm=realm, name=UserGroup.MEMBERS_GROUP_NAME, is_system_group=True ) full_members_system_group = UserGroup.objects.get( realm=realm, name=UserGroup.FULL_MEMBERS_GROUP_NAME, is_system_group=True ) self.assert_length(UserGroupMembership.objects.filter(user_group=members_system_group), 9) self.assert_length( UserGroupMembership.objects.filter(user_group=full_members_system_group), 9 ) self.assertEqual(realm.waiting_period_threshold, 0) hamlet = self.example_user("hamlet") othello = self.example_user("othello") prospero = self.example_user("prospero") self.assertTrue( UserGroupMembership.objects.filter( user_group=members_system_group, user_profile=hamlet ).exists() ) self.assertTrue( UserGroupMembership.objects.filter( user_group=members_system_group, user_profile=othello ).exists() ) self.assertTrue( UserGroupMembership.objects.filter( user_group=members_system_group, user_profile=prospero ).exists() ) self.assertTrue( UserGroupMembership.objects.filter( user_group=full_members_system_group, user_profile=hamlet ).exists() ) self.assertTrue( UserGroupMembership.objects.filter( user_group=full_members_system_group, user_profile=othello ).exists() ) self.assertTrue( UserGroupMembership.objects.filter( user_group=full_members_system_group, user_profile=prospero ).exists() ) hamlet.date_joined = timezone_now() - timedelta(days=50) hamlet.save() othello.date_joined = timezone_now() - timedelta(days=75) othello.save() prospero.date_joined = timezone_now() - timedelta(days=150) prospero.save() do_set_realm_property(realm, "waiting_period_threshold", 100, acting_user=None) self.assertTrue( UserGroupMembership.objects.filter( user_group=members_system_group, user_profile=hamlet ).exists() ) self.assertTrue( UserGroupMembership.objects.filter( user_group=members_system_group, user_profile=othello ).exists() ) self.assertTrue( UserGroupMembership.objects.filter( user_group=members_system_group, user_profile=prospero ).exists() ) self.assertFalse( UserGroupMembership.objects.filter( user_group=full_members_system_group, user_profile=hamlet ).exists() ) self.assertFalse( UserGroupMembership.objects.filter( user_group=full_members_system_group, user_profile=othello ).exists() ) self.assertTrue( UserGroupMembership.objects.filter( user_group=full_members_system_group, user_profile=prospero ).exists() ) do_set_realm_property(realm, "waiting_period_threshold", 70, acting_user=None) self.assertTrue( UserGroupMembership.objects.filter( user_group=members_system_group, user_profile=hamlet ).exists() ) self.assertTrue( UserGroupMembership.objects.filter( user_group=members_system_group, user_profile=othello ).exists() ) self.assertTrue( UserGroupMembership.objects.filter( user_group=members_system_group, user_profile=prospero ).exists() ) self.assertFalse( UserGroupMembership.objects.filter( user_group=full_members_system_group, user_profile=hamlet ).exists() ) self.assertTrue( UserGroupMembership.objects.filter( user_group=full_members_system_group, user_profile=othello ).exists() ) self.assertTrue( UserGroupMembership.objects.filter( user_group=full_members_system_group, user_profile=prospero ).exists() ) class RealmAPITest(ZulipTestCase): def setUp(self) -> None: super().setUp() self.login("desdemona") def set_up_db(self, attr: str, value: Any) -> None: realm = get_realm("zulip") setattr(realm, attr, value) realm.save(update_fields=[attr]) def update_with_api(self, name: str, value: Union[int, str]) -> Realm: if not isinstance(value, str): value = orjson.dumps(value).decode() result = self.client_patch("/json/realm", {name: value}) self.assert_json_success(result) return get_realm("zulip") # refresh data def update_with_api_multiple_value(self, data_dict: Dict[str, Any]) -> Realm: result = self.client_patch("/json/realm", data_dict) self.assert_json_success(result) return get_realm("zulip") def do_test_realm_update_api(self, name: str) -> None: """Test updating realm properties. If new realm properties have been added to the Realm model but the test_values dict below has not been updated, this will raise an assertion error. """ bool_tests: List[bool] = [False, True] test_values: Dict[str, Any] = dict( default_language=["de", "en"], default_code_block_language=["javascript", ""], description=["Realm description", "New description"], digest_weekday=[0, 1, 2], message_retention_days=[10, 20], name=["Zulip", "New Name"], waiting_period_threshold=[10, 20], create_private_stream_policy=Realm.COMMON_POLICY_TYPES, create_public_stream_policy=Realm.COMMON_POLICY_TYPES, create_web_public_stream_policy=Realm.CREATE_WEB_PUBLIC_STREAM_POLICY_TYPES, user_group_edit_policy=Realm.COMMON_POLICY_TYPES, private_message_policy=Realm.PRIVATE_MESSAGE_POLICY_TYPES, invite_to_stream_policy=Realm.COMMON_POLICY_TYPES, wildcard_mention_policy=Realm.WILDCARD_MENTION_POLICY_TYPES, bot_creation_policy=Realm.BOT_CREATION_POLICY_TYPES, video_chat_provider=[ dict( video_chat_provider=orjson.dumps( Realm.VIDEO_CHAT_PROVIDERS["jitsi_meet"]["id"] ).decode(), ), ], giphy_rating=[ Realm.GIPHY_RATING_OPTIONS["y"]["id"], Realm.GIPHY_RATING_OPTIONS["r"]["id"], ], message_content_delete_limit_seconds=[1000, 1100, 1200], invite_to_realm_policy=Realm.INVITE_TO_REALM_POLICY_TYPES, move_messages_between_streams_policy=Realm.MOVE_MESSAGES_BETWEEN_STREAMS_POLICY_TYPES, add_custom_emoji_policy=Realm.COMMON_POLICY_TYPES, delete_own_message_policy=Realm.COMMON_MESSAGE_POLICY_TYPES, edit_topic_policy=Realm.EDIT_TOPIC_POLICY_TYPES, message_content_edit_limit_seconds=[1000, 1100, 1200], move_messages_within_stream_limit_seconds=[1000, 1100, 1200], move_messages_between_streams_limit_seconds=[1000, 1100, 1200], ) vals = test_values.get(name) if Realm.property_types[name] is bool: vals = bool_tests if vals is None: raise AssertionError(f"No test created for {name}") if name == "video_chat_provider": self.set_up_db(name, vals[0][name]) realm = self.update_with_api_multiple_value(vals[0]) self.assertEqual(getattr(realm, name), orjson.loads(vals[0][name])) return self.set_up_db(name, vals[0]) for val in vals[1:]: realm = self.update_with_api(name, val) self.assertEqual(getattr(realm, name), val) realm = self.update_with_api(name, vals[0]) self.assertEqual(getattr(realm, name), vals[0]) def test_update_realm_properties(self) -> None: for prop in Realm.property_types: with self.subTest(property=prop): self.do_test_realm_update_api(prop) # Not in Realm.property_types because org_type has # a unique RealmAuditLog event_type. def test_update_realm_org_type(self) -> None: vals = [t["id"] for t in Realm.ORG_TYPES.values()] self.set_up_db("org_type", vals[0]) for val in vals[1:]: realm = self.update_with_api("org_type", val) self.assertEqual(realm.org_type, val) realm = self.update_with_api("org_type", vals[0]) self.assertEqual(realm.org_type, vals[0]) # Now we test an invalid org_type id. invalid_org_type = 1 assert invalid_org_type not in vals result = self.client_patch("/json/realm", {"org_type": invalid_org_type}) self.assert_json_error(result, "Invalid org_type") def update_with_realm_default_api(self, name: str, val: Any) -> None: if not isinstance(val, str): val = orjson.dumps(val).decode() result = self.client_patch("/json/realm/user_settings_defaults", {name: val}) self.assert_json_success(result) def do_test_realm_default_setting_update_api(self, name: str) -> None: bool_tests: List[bool] = [False, True] test_values: Dict[str, Any] = dict( color_scheme=UserProfile.COLOR_SCHEME_CHOICES, default_view=["recent_topics", "all_messages"], emojiset=[emojiset["key"] for emojiset in RealmUserDefault.emojiset_choices()], demote_inactive_streams=UserProfile.DEMOTE_STREAMS_CHOICES, web_mark_read_on_scroll_policy=UserProfile.WEB_MARK_READ_ON_SCROLL_POLICY_CHOICES, user_list_style=UserProfile.USER_LIST_STYLE_CHOICES, desktop_icon_count_display=[1, 2, 3], notification_sound=["zulip", "ding"], email_notifications_batching_period_seconds=[120, 300], email_address_visibility=UserProfile.EMAIL_ADDRESS_VISIBILITY_TYPES, realm_name_in_email_notifications_policy=UserProfile.REALM_NAME_IN_EMAIL_NOTIFICATIONS_POLICY_CHOICES, ) vals = test_values.get(name) property_type = RealmUserDefault.property_types[name] if property_type is bool: vals = bool_tests if vals is None: raise AssertionError(f"No test created for {name}") realm = get_realm("zulip") realm_user_default = RealmUserDefault.objects.get(realm=realm) do_set_realm_user_default_setting(realm_user_default, name, vals[0], acting_user=None) for val in vals[1:]: self.update_with_realm_default_api(name, val) realm_user_default = RealmUserDefault.objects.get(realm=realm) self.assertEqual(getattr(realm_user_default, name), val) self.update_with_realm_default_api(name, vals[0]) realm_user_default = RealmUserDefault.objects.get(realm=realm) self.assertEqual(getattr(realm_user_default, name), vals[0]) def test_update_default_realm_settings(self) -> None: for prop in RealmUserDefault.property_types: # enable_marketing_emails setting is not actually used and thus cannot be updated # using this endpoint. It is included in notification_setting_types only for avoiding # duplicate code. default_language is currently present in Realm table also and thus # is updated using '/realm' endpoint, but this will be removed in future and the # settings in RealmUserDefault table will be used. if prop in ["default_language", "enable_login_emails", "enable_marketing_emails"]: continue self.do_test_realm_default_setting_update_api(prop) def test_invalid_default_notification_sound_value(self) -> None: result = self.client_patch( "/json/realm/user_settings_defaults", {"notification_sound": "invalid"} ) self.assert_json_error(result, "Invalid notification sound 'invalid'") result = self.client_patch( "/json/realm/user_settings_defaults", {"notification_sound": "zulip"} ) self.assert_json_success(result) realm = get_realm("zulip") realm_user_default = RealmUserDefault.objects.get(realm=realm) self.assertEqual(realm_user_default.notification_sound, "zulip") def test_invalid_email_notifications_batching_period_setting(self) -> None: result = self.client_patch( "/json/realm/user_settings_defaults", {"email_notifications_batching_period_seconds": -1}, ) self.assert_json_error(result, "Invalid email batching period: -1 seconds") result = self.client_patch( "/json/realm/user_settings_defaults", {"email_notifications_batching_period_seconds": 7 * 24 * 60 * 60 + 10}, ) self.assert_json_error(result, "Invalid email batching period: 604810 seconds") def test_ignored_parameters_in_realm_default_endpoint(self) -> None: params = {"starred_message_counts": orjson.dumps(False).decode(), "emoji_set": "twitter"} result = self.client_patch("/json/realm/user_settings_defaults", params) self.assert_json_success(result, ignored_parameters=["emoji_set"]) realm = get_realm("zulip") realm_user_default = RealmUserDefault.objects.get(realm=realm) self.assertEqual(realm_user_default.starred_message_counts, False) def test_update_realm_move_messages_within_stream_limit_seconds_unlimited_value(self) -> None: realm = get_realm("zulip") self.login("iago") realm = self.update_with_api( "move_messages_within_stream_limit_seconds", orjson.dumps("unlimited").decode() ) self.assertEqual(realm.move_messages_within_stream_limit_seconds, None) def test_update_realm_move_messages_between_streams_limit_seconds_unlimited_value(self) -> None: realm = get_realm("zulip") self.login("iago") realm = self.update_with_api( "move_messages_between_streams_limit_seconds", orjson.dumps("unlimited").decode() ) self.assertEqual(realm.move_messages_between_streams_limit_seconds, None) def test_update_realm_delete_own_message_policy(self) -> None: """Tests updating the realm property 'delete_own_message_policy'.""" self.set_up_db("delete_own_message_policy", Realm.POLICY_EVERYONE) realm = self.update_with_api("delete_own_message_policy", Realm.POLICY_ADMINS_ONLY) self.assertEqual(realm.delete_own_message_policy, Realm.POLICY_ADMINS_ONLY) self.assertEqual(realm.message_content_delete_limit_seconds, 600) realm = self.update_with_api("delete_own_message_policy", Realm.POLICY_EVERYONE) realm = self.update_with_api("message_content_delete_limit_seconds", 100) self.assertEqual(realm.delete_own_message_policy, Realm.POLICY_EVERYONE) self.assertEqual(realm.message_content_delete_limit_seconds, 100) realm = self.update_with_api( "message_content_delete_limit_seconds", orjson.dumps("unlimited").decode() ) self.assertEqual(realm.message_content_delete_limit_seconds, None) realm = self.update_with_api("message_content_delete_limit_seconds", 600) self.assertEqual(realm.delete_own_message_policy, Realm.POLICY_EVERYONE) self.assertEqual(realm.message_content_delete_limit_seconds, 600) realm = self.update_with_api("delete_own_message_policy", Realm.POLICY_MODERATORS_ONLY) self.assertEqual(realm.delete_own_message_policy, Realm.POLICY_MODERATORS_ONLY) realm = self.update_with_api("delete_own_message_policy", Realm.POLICY_FULL_MEMBERS_ONLY) self.assertEqual(realm.delete_own_message_policy, Realm.POLICY_FULL_MEMBERS_ONLY) realm = self.update_with_api("delete_own_message_policy", Realm.POLICY_MEMBERS_ONLY) self.assertEqual(realm.delete_own_message_policy, Realm.POLICY_MEMBERS_ONLY) # Test that 0 is invalid value. req = dict(message_content_delete_limit_seconds=orjson.dumps(0).decode()) result = self.client_patch("/json/realm", req) self.assert_json_error(result, "Bad value for 'message_content_delete_limit_seconds': 0") # Test that only "unlimited" string is valid and others are invalid. req = dict(message_content_delete_limit_seconds=orjson.dumps("invalid").decode()) result = self.client_patch("/json/realm", req) self.assert_json_error( result, "Bad value for 'message_content_delete_limit_seconds': invalid" ) def do_test_changing_settings_by_owners_only(self, setting_name: str) -> None: bool_tests: List[bool] = [False, True] test_values: Dict[str, Any] = dict( invite_to_realm_policy=[Realm.POLICY_MEMBERS_ONLY, Realm.POLICY_ADMINS_ONLY], waiting_period_threshold=[10, 20], ) vals = test_values.get(setting_name) if Realm.property_types[setting_name] is bool: vals = bool_tests assert vals is not None self.set_up_db(setting_name, vals[0]) value = vals[1] if not isinstance(value, str): value = orjson.dumps(value).decode() self.login("iago") result = self.client_patch("/json/realm", {setting_name: value}) self.assert_json_error(result, "Must be an organization owner") self.login("desdemona") result = self.client_patch("/json/realm", {setting_name: value}) self.assert_json_success(result) realm = get_realm("zulip") self.assertEqual(getattr(realm, setting_name), vals[1]) def test_changing_user_joining_settings_require_owners(self) -> None: self.do_test_changing_settings_by_owners_only("invite_to_realm_policy") self.do_test_changing_settings_by_owners_only("invite_required") self.do_test_changing_settings_by_owners_only("emails_restricted_to_domains") self.do_test_changing_settings_by_owners_only("disallow_disposable_email_addresses") self.do_test_changing_settings_by_owners_only("waiting_period_threshold") def test_enable_spectator_access_for_limited_plan_realms(self) -> None: self.login("iago") realm = get_realm("zulip") do_change_realm_plan_type(realm, Realm.PLAN_TYPE_LIMITED, acting_user=None) self.assertFalse(realm.enable_spectator_access) req = {"enable_spectator_access": orjson.dumps(True).decode()} result = self.client_patch("/json/realm", req) self.assert_json_error(result, "Available on Zulip Cloud Standard. Upgrade to access.") class ScrubRealmTest(ZulipTestCase): def test_do_delete_all_realm_attachments(self) -> None: realm = get_realm("zulip") hamlet = self.example_user("hamlet") Attachment.objects.filter(realm=realm).delete() assert settings.LOCAL_UPLOADS_DIR is not None assert settings.LOCAL_FILES_DIR is not None path_ids = [] for n in range(1, 4): content = f"content{n}".encode() url = upload_message_attachment( f"dummy{n}.txt", len(content), "text/plain", content, hamlet ) base = "/user_uploads/" self.assertEqual(base, url[: len(base)]) path_id = re.sub("/user_uploads/", "", url) self.assertTrue(os.path.isfile(os.path.join(settings.LOCAL_FILES_DIR, path_id))) path_ids.append(path_id) with mock.patch( "zerver.actions.realm_settings.delete_message_attachments", side_effect=delete_message_attachments, ) as p: do_delete_all_realm_attachments(realm, batch_size=2) self.assertEqual(p.call_count, 2) p.assert_has_calls( [ mock.call([path_ids[0], path_ids[1]]), mock.call([path_ids[2]]), ] ) self.assertEqual(Attachment.objects.filter(realm=realm).count(), 0) for file_path in path_ids: self.assertFalse(os.path.isfile(os.path.join(settings.LOCAL_FILES_DIR, path_id))) def test_scrub_realm(self) -> None: zulip = get_realm("zulip") lear = get_realm("lear") internal_realm = get_realm(settings.SYSTEM_BOT_REALM) hamlet = self.example_user("hamlet") iago = self.example_user("iago") othello = self.example_user("othello") cordelia = self.lear_user("cordelia") king = self.lear_user("king") notification_bot = get_system_bot(settings.NOTIFICATION_BOT, internal_realm.id) create_stream_if_needed(lear, "Shakespeare") self.subscribe(cordelia, "Shakespeare") self.subscribe(king, "Shakespeare") Message.objects.all().delete() UserMessage.objects.all().delete() for i in range(5): self.send_stream_message(iago, "Scotland") self.send_stream_message(othello, "Scotland") self.send_stream_message(cordelia, "Shakespeare") self.send_stream_message(king, "Shakespeare") internal_send_stream_message( notification_bot, get_stream("Scotland", zulip), "test", "test" ) internal_send_private_message(notification_bot, othello, "test") internal_send_huddle_message(zulip, notification_bot, [othello.email, iago.email], "test") internal_send_stream_message( notification_bot, get_stream("Shakespeare", lear), "test", "test" ) internal_send_private_message(notification_bot, king, "test") internal_send_huddle_message(lear, notification_bot, [cordelia.email, king.email], "test") Attachment.objects.filter(realm=zulip).delete() Attachment.objects.filter(realm=lear).delete() assert settings.LOCAL_UPLOADS_DIR is not None assert settings.LOCAL_FILES_DIR is not None file_paths = [] for n, owner in enumerate([iago, othello, hamlet, cordelia, king]): content = f"content{n}".encode() url = upload_message_attachment( f"dummy{n}.txt", len(content), "text/plain", content, owner ) base = "/user_uploads/" self.assertEqual(base, url[: len(base)]) file_path = os.path.join(settings.LOCAL_FILES_DIR, re.sub("/user_uploads/", "", url)) self.assertTrue(os.path.isfile(file_path)) file_paths.append(file_path) CustomProfileField.objects.create(realm=lear) self.assertEqual(Message.objects.filter(sender__in=[iago, othello]).count(), 10) self.assertEqual(Message.objects.filter(sender__in=[cordelia, king]).count(), 10) self.assertEqual(Message.objects.filter(sender=notification_bot).count(), 6) self.assertEqual(UserMessage.objects.filter(user_profile__in=[iago, othello]).count(), 25) self.assertEqual(UserMessage.objects.filter(user_profile__in=[cordelia, king]).count(), 25) self.assertNotEqual(CustomProfileField.objects.filter(realm=zulip).count(), 0) with self.assertLogs(level="WARNING"): do_scrub_realm(zulip, acting_user=None) self.assertEqual(Message.objects.filter(sender__in=[iago, othello]).count(), 0) self.assertEqual(Message.objects.filter(sender__in=[cordelia, king]).count(), 10) self.assertEqual(Message.objects.filter(sender=notification_bot).count(), 3) self.assertEqual(UserMessage.objects.filter(user_profile__in=[iago, othello]).count(), 0) self.assertEqual(UserMessage.objects.filter(user_profile__in=[cordelia, king]).count(), 25) self.assertEqual(Attachment.objects.filter(realm=zulip).count(), 0) self.assertEqual(Attachment.objects.filter(realm=lear).count(), 2) # Zulip realm files don't exist on disk, Lear ones do self.assertFalse(os.path.isfile(file_paths[0])) self.assertFalse(os.path.isfile(file_paths[1])) self.assertFalse(os.path.isfile(file_paths[2])) self.assertTrue(os.path.isfile(file_paths[3])) self.assertTrue(os.path.isfile(file_paths[4])) self.assertEqual(CustomProfileField.objects.filter(realm=zulip).count(), 0) self.assertNotEqual(CustomProfileField.objects.filter(realm=lear).count(), 0) zulip_users = UserProfile.objects.filter(realm=zulip) for user in zulip_users: self.assertRegex(user.full_name, r"^Scrubbed [a-z0-9]{15}$") self.assertRegex(user.email, rf"^scrubbed-[a-z0-9]{{15}}@{re.escape(zulip.host)}$") self.assertRegex( user.delivery_email, rf"^scrubbed-[a-z0-9]{{15}}@{re.escape(zulip.host)}$" ) lear_users = UserProfile.objects.filter(realm=lear) for user in lear_users: self.assertNotRegex(user.full_name, r"^Scrubbed [a-z0-9]{15}$") self.assertNotRegex(user.email, rf"^scrubbed-[a-z0-9]{{15}}@{re.escape(zulip.host)}$") self.assertNotRegex( user.delivery_email, rf"^scrubbed-[a-z0-9]{{15}}@{re.escape(zulip.host)}$" )