import orjson from django.core.exceptions import ValidationError from django.db.utils import IntegrityError from zerver.lib.actions import ( do_change_realm_domain, do_change_user_role, do_create_realm, do_remove_realm_domain, do_set_realm_property, ) from zerver.lib.domains import validate_domain from zerver.lib.email_validation import email_allowed_for_realm from zerver.lib.test_classes import ZulipTestCase from zerver.models import DomainNotAllowedForRealmError, RealmDomain, UserProfile, get_realm class RealmDomainTest(ZulipTestCase): def setUp(self) -> None: realm = get_realm('zulip') do_set_realm_property(realm, 'emails_restricted_to_domains', True) def test_list_realm_domains(self) -> None: self.login('iago') realm = get_realm('zulip') RealmDomain.objects.create(realm=realm, domain='acme.com', allow_subdomains=True) result = self.client_get("/json/realm/domains") self.assert_json_success(result) received = orjson.dumps(result.json()['domains'], option=orjson.OPT_SORT_KEYS) expected = orjson.dumps([{'domain': 'zulip.com', 'allow_subdomains': False}, {'domain': 'acme.com', 'allow_subdomains': True}], option=orjson.OPT_SORT_KEYS) self.assertEqual(received, expected) def test_not_realm_admin(self) -> None: self.login('hamlet') result = self.client_post("/json/realm/domains") self.assert_json_error(result, 'Must be an organization administrator') result = self.client_patch("/json/realm/domains/15") self.assert_json_error(result, 'Must be an organization administrator') result = self.client_delete("/json/realm/domains/15") self.assert_json_error(result, 'Must be an organization administrator') def test_create_realm_domain(self) -> None: self.login('iago') data = {'domain': orjson.dumps('').decode(), 'allow_subdomains': orjson.dumps(True).decode()} result = self.client_post("/json/realm/domains", info=data) self.assert_json_error(result, 'Invalid domain: Domain can\'t be empty.') data['domain'] = orjson.dumps('acme.com').decode() result = self.client_post("/json/realm/domains", info=data) self.assert_json_success(result) realm = get_realm('zulip') self.assertTrue(RealmDomain.objects.filter(realm=realm, domain='acme.com', allow_subdomains=True).exists()) result = self.client_post("/json/realm/domains", info=data) self.assert_json_error(result, 'The domain acme.com is already a part of your organization.') mit_user_profile = self.mit_user("sipbtest") self.login_user(mit_user_profile) do_change_user_role(mit_user_profile, UserProfile.ROLE_REALM_ADMINISTRATOR) result = self.client_post("/json/realm/domains", info=data, HTTP_HOST=mit_user_profile.realm.host) self.assert_json_success(result) def test_patch_realm_domain(self) -> None: self.login('iago') realm = get_realm('zulip') RealmDomain.objects.create(realm=realm, domain='acme.com', allow_subdomains=False) data = { 'allow_subdomains': orjson.dumps(True).decode(), } url = "/json/realm/domains/acme.com" result = self.client_patch(url, data) self.assert_json_success(result) self.assertTrue(RealmDomain.objects.filter(realm=realm, domain='acme.com', allow_subdomains=True).exists()) url = "/json/realm/domains/non-existent.com" result = self.client_patch(url, data) self.assertEqual(result.status_code, 400) self.assert_json_error(result, 'No entry found for domain non-existent.com.') def test_delete_realm_domain(self) -> None: self.login('iago') realm = get_realm('zulip') RealmDomain.objects.create(realm=realm, domain='acme.com') result = self.client_delete("/json/realm/domains/non-existent.com") self.assertEqual(result.status_code, 400) self.assert_json_error(result, 'No entry found for domain non-existent.com.') result = self.client_delete("/json/realm/domains/acme.com") self.assert_json_success(result) self.assertFalse(RealmDomain.objects.filter(domain='acme.com').exists()) self.assertTrue(realm.emails_restricted_to_domains) def test_delete_all_realm_domains(self) -> None: self.login('iago') realm = get_realm('zulip') query = RealmDomain.objects.filter(realm=realm) self.assertTrue(realm.emails_restricted_to_domains) for realm_domain in query.all(): do_remove_realm_domain(realm_domain) self.assertEqual(query.count(), 0) # Deleting last realm_domain should set `emails_restricted_to_domains` to False. # This should be tested on a fresh instance, since the cached objects # would not be updated. self.assertFalse(get_realm('zulip').emails_restricted_to_domains) def test_email_allowed_for_realm(self) -> None: realm1 = do_create_realm('testrealm1', 'Test Realm 1', emails_restricted_to_domains=True) realm2 = do_create_realm('testrealm2', 'Test Realm 2', emails_restricted_to_domains=True) realm_domain = RealmDomain.objects.create(realm=realm1, domain='test1.com', allow_subdomains=False) RealmDomain.objects.create(realm=realm2, domain='test2.test1.com', allow_subdomains=True) email_allowed_for_realm('user@test1.com', realm1) with self.assertRaises(DomainNotAllowedForRealmError): email_allowed_for_realm('user@test2.test1.com', realm1) email_allowed_for_realm('user@test2.test1.com', realm2) email_allowed_for_realm('user@test3.test2.test1.com', realm2) with self.assertRaises(DomainNotAllowedForRealmError): email_allowed_for_realm('user@test3.test1.com', realm2) do_change_realm_domain(realm_domain, True) email_allowed_for_realm('user@test1.com', realm1) email_allowed_for_realm('user@test2.test1.com', realm1) with self.assertRaises(DomainNotAllowedForRealmError): email_allowed_for_realm('user@test2.com', realm1) def test_realm_realm_domains_uniqueness(self) -> None: realm = get_realm('zulip') with self.assertRaises(IntegrityError): RealmDomain.objects.create(realm=realm, domain='zulip.com', allow_subdomains=True) def test_validate_domain(self) -> None: invalid_domains = ['', 'test', 't.', 'test.', '.com', '-test', 'test...com', 'test-', 'test_domain.com', 'test.-domain.com', 'a' * 255 + ".com"] for domain in invalid_domains: with self.assertRaises(ValidationError): validate_domain(domain) valid_domains = ['acme.com', 'x-x.y.3.z'] for domain in valid_domains: validate_domain(domain)