from unittest import mock import orjson from zerver.lib.actions import do_set_realm_property, ensure_stream from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_helpers import most_recent_usermessage from zerver.lib.user_groups import ( check_add_user_to_user_group, check_remove_user_from_user_group, create_user_group, get_memberships_of_users, get_user_groups, user_groups_in_realm, user_groups_in_realm_serialized, ) from zerver.models import Realm, UserGroup, UserGroupMembership, UserProfile, get_realm class UserGroupTestCase(ZulipTestCase): def create_user_group_for_test( self, group_name: str, realm: Realm = get_realm("zulip") ) -> UserGroup: members = [self.example_user("othello")] return create_user_group(group_name, members, realm) def test_user_groups_in_realm(self) -> None: realm = get_realm("zulip") self.assert_length(user_groups_in_realm(realm), 1) self.create_user_group_for_test("support") user_groups = user_groups_in_realm(realm) self.assert_length(user_groups, 2) names = {ug.name for ug in user_groups} self.assertEqual(names, {"hamletcharacters", "support"}) def test_user_groups_in_realm_serialized(self) -> None: realm = get_realm("zulip") user_group = UserGroup.objects.first() membership = UserGroupMembership.objects.filter(user_group=user_group) membership = membership.values_list("user_profile_id", flat=True) empty_user_group = create_user_group("newgroup", [], realm) user_groups = user_groups_in_realm_serialized(realm) self.assert_length(user_groups, 2) self.assertEqual(user_groups[0]["id"], user_group.id) self.assertEqual(user_groups[0]["name"], "hamletcharacters") self.assertEqual(user_groups[0]["description"], "Characters of Hamlet") self.assertEqual(set(user_groups[0]["members"]), set(membership)) self.assertEqual(user_groups[1]["id"], empty_user_group.id) self.assertEqual(user_groups[1]["name"], "newgroup") self.assertEqual(user_groups[1]["description"], "") self.assertEqual(user_groups[1]["members"], []) def test_get_user_groups(self) -> None: othello = self.example_user("othello") self.create_user_group_for_test("support") user_groups = get_user_groups(othello) self.assert_length(user_groups, 1) self.assertEqual(user_groups[0].name, "support") def test_check_add_user_to_user_group(self) -> None: user_group = self.create_user_group_for_test("support") hamlet = self.example_user("hamlet") self.assertTrue(check_add_user_to_user_group(hamlet, user_group)) self.assertFalse(check_add_user_to_user_group(hamlet, user_group)) def test_check_remove_user_from_user_group(self) -> None: user_group = self.create_user_group_for_test("support") othello = self.example_user("othello") self.assertTrue(check_remove_user_from_user_group(othello, user_group)) self.assertFalse(check_remove_user_from_user_group(othello, user_group)) with mock.patch( "zerver.lib.user_groups.remove_user_from_user_group", side_effect=Exception ): self.assertFalse(check_remove_user_from_user_group(othello, user_group)) class UserGroupAPITestCase(ZulipTestCase): def test_user_group_create(self) -> None: hamlet = self.example_user("hamlet") # Test success self.login("hamlet") params = { "name": "support", "members": orjson.dumps([hamlet.id]).decode(), "description": "Support team", } result = self.client_post("/json/user_groups/create", info=params) self.assert_json_success(result) self.assert_length(UserGroup.objects.all(), 2) # Test invalid member error params = { "name": "backend", "members": orjson.dumps([1111]).decode(), "description": "Backend team", } result = self.client_post("/json/user_groups/create", info=params) self.assert_json_error(result, "Invalid user ID: 1111") self.assert_length(UserGroup.objects.all(), 2) # Test we cannot add hamlet again params = { "name": "support", "members": orjson.dumps([hamlet.id]).decode(), "description": "Support team", } result = self.client_post("/json/user_groups/create", info=params) self.assert_json_error(result, "User group 'support' already exists.") self.assert_length(UserGroup.objects.all(), 2) def test_user_group_get(self) -> None: # Test success user_profile = self.example_user("hamlet") self.login_user(user_profile) result = self.client_get("/json/user_groups") self.assert_json_success(result) self.assert_length( result.json()["user_groups"], UserGroup.objects.filter(realm=user_profile.realm).count() ) def test_can_edit_user_groups(self) -> None: def validation_func(user_profile: UserProfile) -> bool: user_profile.refresh_from_db() return user_profile.can_edit_user_groups() self.check_has_permission_policies("user_group_edit_policy", validation_func) def test_user_group_create_by_guest_user(self) -> None: guest_user = self.example_user("polonius") # Guest users can't create user group self.login_user(guest_user) params = { "name": "support", "members": orjson.dumps([guest_user.id]).decode(), "description": "Support team", } result = self.client_post("/json/user_groups/create", info=params) self.assert_json_error(result, "Not allowed for guest users") def test_user_group_update(self) -> None: hamlet = self.example_user("hamlet") self.login("hamlet") params = { "name": "support", "members": orjson.dumps([hamlet.id]).decode(), "description": "Support team", } self.client_post("/json/user_groups/create", info=params) user_group = UserGroup.objects.get(name="support") # Test success params = { "name": "help", "description": "Troubleshooting team", } result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params) self.assert_json_success(result) # Test when new data is not supplied. result = self.client_patch(f"/json/user_groups/{user_group.id}", info={}) self.assert_json_error(result, "No new data supplied") # Test when invalid user group is supplied params = {"name": "help"} result = self.client_patch("/json/user_groups/1111", info=params) self.assert_json_error(result, "Invalid user group") self.logout() # Test when user not a member of user group tries to modify it cordelia = self.example_user("cordelia") self.login_user(cordelia) params = { "name": "help", "description": "Troubleshooting", } result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params) self.assert_json_error( result, "Only group members and organization administrators can administer this group." ) self.logout() # Test when organization admin tries to modify group iago = self.example_user("iago") self.login_user(iago) params = { "name": "help", "description": "Troubleshooting", } result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params) self.assert_json_success(result) def test_user_group_update_by_guest_user(self) -> None: hamlet = self.example_user("hamlet") guest_user = self.example_user("polonius") self.login_user(hamlet) params = { "name": "support", "members": orjson.dumps([hamlet.id, guest_user.id]).decode(), "description": "Support team", } result = self.client_post("/json/user_groups/create", info=params) self.assert_json_success(result) user_group = UserGroup.objects.get(name="support") # Guest user can't edit any detail of an user group self.login_user(guest_user) params = { "name": "help", "description": "Troubleshooting team", } result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params) self.assert_json_error(result, "Not allowed for guest users") def test_user_group_update_to_already_existing_name(self) -> None: hamlet = self.example_user("hamlet") self.login_user(hamlet) realm = get_realm("zulip") support_user_group = create_user_group("support", [hamlet], realm) marketing_user_group = create_user_group("marketing", [hamlet], realm) params = { "name": marketing_user_group.name, } result = self.client_patch(f"/json/user_groups/{support_user_group.id}", info=params) self.assert_json_error(result, f"User group '{marketing_user_group.name}' already exists.") def test_user_group_delete(self) -> None: hamlet = self.example_user("hamlet") self.login("hamlet") params = { "name": "support", "members": orjson.dumps([hamlet.id]).decode(), "description": "Support team", } self.client_post("/json/user_groups/create", info=params) user_group = UserGroup.objects.get(name="support") # Test success self.assertEqual(UserGroup.objects.count(), 2) self.assertEqual(UserGroupMembership.objects.count(), 3) result = self.client_delete(f"/json/user_groups/{user_group.id}") self.assert_json_success(result) self.assertEqual(UserGroup.objects.count(), 1) self.assertEqual(UserGroupMembership.objects.count(), 2) # Test when invalid user group is supplied result = self.client_delete("/json/user_groups/1111") self.assert_json_error(result, "Invalid user group") # Test when user not a member of user group tries to delete it params = { "name": "Development", "members": orjson.dumps([hamlet.id]).decode(), "description": "Development team", } self.client_post("/json/user_groups/create", info=params) user_group = UserGroup.objects.get(name="Development") self.assertEqual(UserGroup.objects.count(), 2) self.logout() cordelia = self.example_user("cordelia") self.login_user(cordelia) result = self.client_delete(f"/json/user_groups/{user_group.id}") self.assert_json_error( result, "Only group members and organization administrators can administer this group." ) self.assertEqual(UserGroup.objects.count(), 2) self.logout() # Test when organization admin tries to delete group iago = self.example_user("iago") self.login_user(iago) result = self.client_delete(f"/json/user_groups/{user_group.id}") self.assert_json_success(result) self.assertEqual(UserGroup.objects.count(), 1) self.assertEqual(UserGroupMembership.objects.count(), 2) def test_user_group_delete_by_guest_user(self) -> None: hamlet = self.example_user("hamlet") guest_user = self.example_user("polonius") self.login_user(hamlet) params = { "name": "support", "members": orjson.dumps([hamlet.id, guest_user.id]).decode(), "description": "Support team", } result = self.client_post("/json/user_groups/create", info=params) self.assert_json_success(result) user_group = UserGroup.objects.get(name="support") # Guest users can't delete any user group(not even those of which they are a member) self.login_user(guest_user) result = self.client_delete(f"/json/user_groups/{user_group.id}") self.assert_json_error(result, "Not allowed for guest users") def test_update_members_of_user_group(self) -> None: hamlet = self.example_user("hamlet") self.login("hamlet") params = { "name": "support", "members": orjson.dumps([hamlet.id]).decode(), "description": "Support team", } self.client_post("/json/user_groups/create", info=params) user_group = UserGroup.objects.get(name="support") # Test add members self.assertEqual(UserGroupMembership.objects.count(), 3) othello = self.example_user("othello") add = [othello.id] params = {"add": orjson.dumps(add).decode()} result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params) self.assert_json_success(result) self.assertEqual(UserGroupMembership.objects.count(), 4) members = get_memberships_of_users(user_group, [hamlet, othello]) self.assert_length(members, 2) # Test adding a member already there. result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params) self.assert_json_error(result, f"User {othello.id} is already a member of this group") self.assertEqual(UserGroupMembership.objects.count(), 4) members = get_memberships_of_users(user_group, [hamlet, othello]) self.assert_length(members, 2) self.logout() # Test when user not a member of user group tries to add members to it cordelia = self.example_user("cordelia") self.login_user(cordelia) add = [cordelia.id] params = {"add": orjson.dumps(add).decode()} result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params) self.assert_json_error( result, "Only group members and organization administrators can administer this group." ) self.assertEqual(UserGroupMembership.objects.count(), 4) self.logout() # Test when organization admin tries to add members to group iago = self.example_user("iago") self.login_user(iago) aaron = self.example_user("aaron") add = [aaron.id] params = {"add": orjson.dumps(add).decode()} result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params) self.assert_json_success(result) self.assertEqual(UserGroupMembership.objects.count(), 5) members = get_memberships_of_users(user_group, [hamlet, othello, aaron]) self.assert_length(members, 3) # For normal testing we again log in with hamlet self.logout() self.login_user(hamlet) # Test remove members params = {"delete": orjson.dumps([othello.id]).decode()} result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params) self.assert_json_success(result) self.assertEqual(UserGroupMembership.objects.count(), 4) members = get_memberships_of_users(user_group, [hamlet, othello, aaron]) self.assert_length(members, 2) # Test remove a member that's already removed params = {"delete": orjson.dumps([othello.id]).decode()} result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params) self.assert_json_error(result, f"There is no member '{othello.id}' in this user group") self.assertEqual(UserGroupMembership.objects.count(), 4) members = get_memberships_of_users(user_group, [hamlet, othello, aaron]) self.assert_length(members, 2) # Test when nothing is provided result = self.client_post(f"/json/user_groups/{user_group.id}/members", info={}) msg = 'Nothing to do. Specify at least one of "add" or "delete".' self.assert_json_error(result, msg) # Test when user not a member of user group tries to remove members self.logout() self.login_user(cordelia) params = {"delete": orjson.dumps([hamlet.id]).decode()} result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params) self.assert_json_error( result, "Only group members and organization administrators can administer this group." ) self.assertEqual(UserGroupMembership.objects.count(), 4) self.logout() # Test when organization admin tries to remove members from group iago = self.example_user("iago") self.login_user(iago) result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params) self.assert_json_success(result) self.assertEqual(UserGroupMembership.objects.count(), 3) members = get_memberships_of_users(user_group, [hamlet, othello, aaron]) self.assert_length(members, 1) def test_mentions(self) -> None: cordelia = self.example_user("cordelia") hamlet = self.example_user("hamlet") othello = self.example_user("othello") zoe = self.example_user("ZOE") realm = cordelia.realm group_name = "support" stream_name = "Dev help" content_with_group_mention = "hey @*support* can you help us with this?" ensure_stream(realm, stream_name, acting_user=None) all_users = {cordelia, hamlet, othello, zoe} support_team = {hamlet, zoe} sender = cordelia other_users = all_users - support_team for user in all_users: self.subscribe(user, stream_name) create_user_group( name=group_name, members=list(support_team), realm=realm, ) payload = dict( type="stream", to=stream_name, client="test suite", topic="whatever", content=content_with_group_mention, ) result = self.api_post(sender, "/json/messages", payload) self.assert_json_success(result) for user in support_team: um = most_recent_usermessage(user) self.assertTrue(um.flags.mentioned) for user in other_users: um = most_recent_usermessage(user) self.assertFalse(um.flags.mentioned) def test_only_admin_manage_groups(self) -> None: iago = self.example_user("iago") hamlet = self.example_user("hamlet") cordelia = self.example_user("cordelia") self.login_user(iago) do_set_realm_property( iago.realm, "user_group_edit_policy", Realm.USER_GROUP_EDIT_POLICY_ADMINS, acting_user=None, ) params = { "name": "support", "members": orjson.dumps([iago.id, hamlet.id]).decode(), "description": "Support team", } result = self.client_post("/json/user_groups/create", info=params) self.assert_json_success(result) user_group = UserGroup.objects.get(name="support") # Test add member params = {"add": orjson.dumps([cordelia.id]).decode()} result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params) self.assert_json_success(result) # Test remove member params = {"delete": orjson.dumps([cordelia.id]).decode()} result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params) self.assert_json_success(result) # Test changing groups name params = { "name": "help", "description": "Troubleshooting", } result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params) self.assert_json_success(result) # Test delete a group result = self.client_delete(f"/json/user_groups/{user_group.id}") self.assert_json_success(result) user_group = create_user_group( name="support", members=[hamlet, iago], realm=iago.realm, ) self.logout() self.login("hamlet") # Test creating a group params = { "name": "support2", "members": orjson.dumps([hamlet.id]).decode(), "description": "Support team", } result = self.client_post("/json/user_groups/create", info=params) self.assert_json_error(result, "Must be an organization administrator") # Test add member params = {"add": orjson.dumps([cordelia.id]).decode()} result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params) self.assert_json_error(result, "Must be an organization administrator") # Test delete a group result = self.client_delete(f"/json/user_groups/{user_group.id}") self.assert_json_error(result, "Must be an organization administrator") # Test changing groups name params = { "name": "help", "description": "Troubleshooting", } result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params) self.assert_json_error(result, "Must be an organization administrator")