zulip/zerver/tests/test_user_groups.py

3336 lines
142 KiB
Python
Raw Normal View History

from collections.abc import Iterable
from datetime import datetime, timedelta
from unittest import mock
import orjson
import time_machine
from django.utils.timezone import now as timezone_now
user_groups: Make locks required for updating user group memberships. **Background** User groups are expected to comply with the DAG constraint for the many-to-many inter-group membership. The check for this constraint has to be performed recursively so that we can find all direct and indirect subgroups of the user group to be added. This kind of check is vulnerable to phantom reads which is possible at the default read committed isolation level because we cannot guarantee that the check is still valid when we are adding the subgroups to the user group. **Solution** To avoid having another transaction concurrently update one of the to-be-subgroup after the recursive check is done, and before the subgroup is added, we use SELECT FOR UPDATE to lock the user group rows. The lock needs to be acquired before a group membership change is about to occur before any check has been conducted. Suppose that we are adding subgroup B to supergroup A, the locking protocol is specified as follows: 1. Acquire a lock for B and all its direct and indirect subgroups. 2. Acquire a lock for A. For the removal of user groups, we acquire a lock for the user group to be removed with all its direct and indirect subgroups. This is the special case A=B, which is still complaint with the protocol. **Error handling** We currently rely on Postgres' deadlock detection to abort transactions and show an error for the users. In the future, we might need some recovery mechanism or at least better error handling. **Notes** An important note is that we need to reuse the recursive CTE query that finds the direct and indirect subgroups when applying the lock on the rows. And the lock needs to be acquired the same way for the addition and removal of direct subgroups. User membership change (as opposed to user group membership) is not affected. Read-only queries aren't either. The locks only protect critical regions where the user group dependency graph might violate the DAG constraint, where users are not participating. **Testing** We implement a transaction test case targeting some typical scenarios when an internal server error is expected to happen (this means that the user group view makes the correct decision to abort the transaction when something goes wrong with locks). To achieve this, we add a development view intended only for unit tests. It has a global BARRIER that can be shared across threads, so that we can synchronize them to consistently reproduce certain potential race conditions prevented by the database locks. The transaction test case lanuches pairs of threads initiating possibly conflicting requests at the same time. The tests are set up such that exactly N of them are expected to succeed with a certain error message (while we don't know each one). **Security notes** get_recursive_subgroups_for_groups will no longer fetch user groups from other realms. As a result, trying to add/remove a subgroup from another realm results in a UserGroup not found error response. We also implement subgroup-specific checks in has_user_group_access to keep permission managing in a single place. Do note that the API currently don't have a way to violate that check because we are only checking the realm ID now.
2023-06-17 04:39:52 +02:00
from zerver.actions.create_realm import do_create_realm
from zerver.actions.create_user import do_reactivate_user
from zerver.actions.realm_settings import (
do_change_realm_permission_group_setting,
do_set_realm_property,
)
from zerver.actions.streams import (
do_change_stream_group_based_setting,
do_deactivate_stream,
do_unarchive_stream,
)
from zerver.actions.user_groups import (
user_groups: Make locks required for updating user group memberships. **Background** User groups are expected to comply with the DAG constraint for the many-to-many inter-group membership. The check for this constraint has to be performed recursively so that we can find all direct and indirect subgroups of the user group to be added. This kind of check is vulnerable to phantom reads which is possible at the default read committed isolation level because we cannot guarantee that the check is still valid when we are adding the subgroups to the user group. **Solution** To avoid having another transaction concurrently update one of the to-be-subgroup after the recursive check is done, and before the subgroup is added, we use SELECT FOR UPDATE to lock the user group rows. The lock needs to be acquired before a group membership change is about to occur before any check has been conducted. Suppose that we are adding subgroup B to supergroup A, the locking protocol is specified as follows: 1. Acquire a lock for B and all its direct and indirect subgroups. 2. Acquire a lock for A. For the removal of user groups, we acquire a lock for the user group to be removed with all its direct and indirect subgroups. This is the special case A=B, which is still complaint with the protocol. **Error handling** We currently rely on Postgres' deadlock detection to abort transactions and show an error for the users. In the future, we might need some recovery mechanism or at least better error handling. **Notes** An important note is that we need to reuse the recursive CTE query that finds the direct and indirect subgroups when applying the lock on the rows. And the lock needs to be acquired the same way for the addition and removal of direct subgroups. User membership change (as opposed to user group membership) is not affected. Read-only queries aren't either. The locks only protect critical regions where the user group dependency graph might violate the DAG constraint, where users are not participating. **Testing** We implement a transaction test case targeting some typical scenarios when an internal server error is expected to happen (this means that the user group view makes the correct decision to abort the transaction when something goes wrong with locks). To achieve this, we add a development view intended only for unit tests. It has a global BARRIER that can be shared across threads, so that we can synchronize them to consistently reproduce certain potential race conditions prevented by the database locks. The transaction test case lanuches pairs of threads initiating possibly conflicting requests at the same time. The tests are set up such that exactly N of them are expected to succeed with a certain error message (while we don't know each one). **Security notes** get_recursive_subgroups_for_groups will no longer fetch user groups from other realms. As a result, trying to add/remove a subgroup from another realm results in a UserGroup not found error response. We also implement subgroup-specific checks in has_user_group_access to keep permission managing in a single place. Do note that the API currently don't have a way to violate that check because we are only checking the realm ID now.
2023-06-17 04:39:52 +02:00
add_subgroups_to_user_group,
bulk_add_members_to_user_groups,
bulk_remove_members_from_user_groups,
check_add_user_group,
create_user_group_in_database,
do_change_user_group_permission_setting,
do_deactivate_user_group,
promote_new_full_members,
remove_subgroups_from_user_group,
)
from zerver.actions.users import do_deactivate_user
from zerver.lib.create_user import create_user
from zerver.lib.mention import silent_mention_syntax_for_user
from zerver.lib.streams import ensure_stream
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import most_recent_usermessage
from zerver.lib.timestamp import datetime_to_timestamp
from zerver.lib.user_groups import (
AnonymousSettingGroupDict,
get_direct_user_groups,
get_recursive_group_members,
get_recursive_membership_groups,
get_recursive_strict_subgroups,
get_recursive_subgroups,
get_role_based_system_groups_dict,
get_subgroup_ids,
get_user_group_member_ids,
has_user_group_access_for_subgroup,
is_any_user_in_group,
is_user_in_group,
user_groups_in_realm_serialized,
)
from zerver.models import (
GroupGroupMembership,
NamedUserGroup,
Realm,
Stream,
UserGroup,
UserGroupMembership,
UserProfile,
)
from zerver.models.groups import SystemGroups
from zerver.models.realms import get_realm
class UserGroupTestCase(ZulipTestCase):
2024-04-18 12:23:46 +02:00
def assert_user_membership(
self, user_group: NamedUserGroup, members: Iterable[UserProfile]
) -> None:
user_ids = get_user_group_member_ids(user_group, direct_member_only=True)
self.assertSetEqual(set(user_ids), {member.id for member in members})
def assert_member_not_in_group(self, user_group: NamedUserGroup, member: UserProfile) -> None:
user_ids = get_user_group_member_ids(user_group, direct_member_only=True)
self.assertNotIn(member.id, user_ids)
def assert_subgroup_membership(
2024-04-18 12:23:46 +02:00
self, user_group: NamedUserGroup, members: Iterable[UserGroup]
) -> None:
subgroup_ids = get_subgroup_ids(user_group, direct_subgroup_only=True)
self.assertSetEqual(set(subgroup_ids), {member.id for member in members})
def create_user_group_for_test(
self, group_name: str, acting_user: UserProfile
) -> NamedUserGroup:
members = [self.example_user("othello")]
return check_add_user_group(
get_realm("zulip"), group_name, members, acting_user=acting_user
)
def test_user_groups_in_realm_serialized(self) -> None:
def convert_date_created_to_timestamp(date_created: datetime | None) -> int | None:
return datetime_to_timestamp(date_created) if date_created is not None else None
realm = get_realm("zulip")
user = self.example_user("iago")
2024-04-18 12:23:46 +02:00
user_group = NamedUserGroup.objects.filter(realm=realm).first()
assert user_group is not None
empty_user_group = check_add_user_group(realm, "newgroup", [], acting_user=user)
do_deactivate_user(self.example_user("hamlet"), acting_user=None)
user_groups = user_groups_in_realm_serialized(realm, include_deactivated_groups=False)
self.assert_length(user_groups, 10)
self.assertEqual(user_groups[0]["id"], user_group.id)
self.assertEqual(user_groups[0]["creator_id"], user_group.creator_id)
self.assertEqual(
user_groups[0]["date_created"],
convert_date_created_to_timestamp(user_group.date_created),
)
self.assertEqual(user_groups[0]["name"], SystemGroups.NOBODY)
self.assertEqual(user_groups[0]["description"], "Nobody")
self.assertEqual(user_groups[0]["members"], [])
self.assertEqual(user_groups[0]["direct_subgroup_ids"], [])
self.assertEqual(user_groups[0]["can_manage_group"], user_group.id)
self.assertEqual(user_groups[0]["can_mention_group"], user_group.id)
self.assertFalse(user_groups[0]["deactivated"])
2024-04-18 12:23:46 +02:00
owners_system_group = NamedUserGroup.objects.get(name=SystemGroups.OWNERS, realm=realm)
membership = UserGroupMembership.objects.filter(user_group=owners_system_group).values_list(
"user_profile_id", flat=True
)
self.assertEqual(user_groups[1]["id"], owners_system_group.id)
self.assertEqual(user_groups[1]["creator_id"], owners_system_group.creator_id)
self.assertEqual(
user_groups[1]["date_created"],
convert_date_created_to_timestamp(owners_system_group.date_created),
)
self.assertEqual(user_groups[1]["name"], SystemGroups.OWNERS)
self.assertEqual(user_groups[1]["description"], "Owners of this organization")
self.assertEqual(set(user_groups[1]["members"]), set(membership))
self.assertEqual(user_groups[1]["direct_subgroup_ids"], [])
self.assertEqual(user_groups[1]["can_manage_group"], user_group.id)
self.assertEqual(user_groups[1]["can_mention_group"], user_group.id)
self.assertFalse(user_groups[0]["deactivated"])
2024-04-18 12:23:46 +02:00
admins_system_group = NamedUserGroup.objects.get(
name=SystemGroups.ADMINISTRATORS, realm=realm
)
self.assertEqual(user_groups[2]["id"], admins_system_group.id)
# Check that owners system group is present in "direct_subgroup_ids"
self.assertEqual(user_groups[2]["direct_subgroup_ids"], [owners_system_group.id])
self.assertEqual(user_groups[8]["name"], "hamletcharacters")
# Test deactivated user is not included in the members list.
self.assertEqual(user_groups[8]["members"], [self.example_user("cordelia").id])
everyone_group = NamedUserGroup.objects.get(
name=SystemGroups.EVERYONE, realm=realm, is_system_group=True
)
self.assertEqual(user_groups[9]["id"], empty_user_group.id)
self.assertEqual(user_groups[9]["creator_id"], empty_user_group.creator_id)
self.assertEqual(
user_groups[9]["date_created"],
convert_date_created_to_timestamp(empty_user_group.date_created),
)
self.assertEqual(user_groups[9]["name"], "newgroup")
self.assertEqual(user_groups[9]["description"], "")
self.assertEqual(user_groups[9]["members"], [])
self.assertEqual(
user_groups[9]["can_manage_group"],
AnonymousSettingGroupDict(direct_members=[11], direct_subgroups=[]),
)
self.assertEqual(user_groups[9]["can_mention_group"], everyone_group.id)
self.assertFalse(user_groups[0]["deactivated"])
othello = self.example_user("othello")
hamletcharacters_group = NamedUserGroup.objects.get(name="hamletcharacters", realm=realm)
setting_group = self.create_or_update_anonymous_group_for_setting(
[othello], [admins_system_group, hamletcharacters_group]
)
new_user_group = check_add_user_group(
realm,
"newgroup2",
[othello],
group_settings_map={
"can_manage_group": setting_group,
"can_mention_group": setting_group,
},
acting_user=self.example_user("desdemona"),
)
user_groups = user_groups_in_realm_serialized(realm, include_deactivated_groups=False)
self.assertEqual(user_groups[10]["id"], new_user_group.id)
self.assertEqual(user_groups[10]["creator_id"], new_user_group.creator_id)
self.assertEqual(
user_groups[10]["date_created"],
convert_date_created_to_timestamp(new_user_group.date_created),
)
self.assertEqual(user_groups[10]["name"], "newgroup2")
self.assertEqual(user_groups[10]["description"], "")
self.assertEqual(user_groups[10]["members"], [othello.id])
assert isinstance(user_groups[10]["can_manage_group"], AnonymousSettingGroupDict)
self.assertEqual(user_groups[10]["can_manage_group"].direct_members, [othello.id])
self.assertCountEqual(
user_groups[10]["can_manage_group"].direct_subgroups,
[admins_system_group.id, hamletcharacters_group.id],
)
assert isinstance(user_groups[10]["can_mention_group"], AnonymousSettingGroupDict)
self.assertEqual(user_groups[10]["can_mention_group"].direct_members, [othello.id])
self.assertCountEqual(
user_groups[10]["can_mention_group"].direct_subgroups,
[admins_system_group.id, hamletcharacters_group.id],
)
self.assertFalse(user_groups[0]["deactivated"])
hamlet = self.example_user("hamlet")
another_new_group = check_add_user_group(realm, "newgroup3", [hamlet], acting_user=hamlet)
add_subgroups_to_user_group(
new_user_group, [another_new_group, owners_system_group], acting_user=None
)
do_deactivate_user_group(another_new_group, acting_user=None)
user_groups = user_groups_in_realm_serialized(realm, include_deactivated_groups=True)
self.assert_length(user_groups, 12)
self.assertEqual(user_groups[10]["id"], new_user_group.id)
self.assertEqual(user_groups[10]["name"], "newgroup2")
self.assertFalse(user_groups[10]["deactivated"])
self.assertCountEqual(
user_groups[10]["direct_subgroup_ids"], [another_new_group.id, owners_system_group.id]
)
self.assertEqual(user_groups[11]["id"], another_new_group.id)
self.assertEqual(user_groups[11]["name"], "newgroup3")
self.assertTrue(user_groups[11]["deactivated"])
user_groups = user_groups_in_realm_serialized(realm, include_deactivated_groups=False)
self.assert_length(user_groups, 11)
self.assertEqual(user_groups[10]["id"], new_user_group.id)
self.assertEqual(user_groups[10]["name"], "newgroup2")
self.assertFalse(user_groups[10]["deactivated"])
self.assertCountEqual(
user_groups[10]["direct_subgroup_ids"], [another_new_group.id, owners_system_group.id]
)
def test_get_direct_user_groups(self) -> None:
othello = self.example_user("othello")
self.create_user_group_for_test("support", acting_user=self.example_user("desdemona"))
user_groups = get_direct_user_groups(othello)
self.assert_length(user_groups, 3)
# othello is a direct member of two role-based system groups also.
user_group_names = [group.named_user_group.name for group in user_groups]
self.assertEqual(
set(user_group_names),
{"support", SystemGroups.MEMBERS, SystemGroups.FULL_MEMBERS},
)
def test_recursive_queries_for_user_groups(self) -> None:
realm = get_realm("zulip")
iago = self.example_user("iago")
desdemona = self.example_user("desdemona")
shiva = self.example_user("shiva")
leadership_group = check_add_user_group(
realm, "Leadership", [desdemona], acting_user=desdemona
)
staff_group = check_add_user_group(realm, "Staff", [iago], acting_user=iago)
GroupGroupMembership.objects.create(supergroup=staff_group, subgroup=leadership_group)
everyone_group = check_add_user_group(realm, "Everyone", [shiva], acting_user=shiva)
GroupGroupMembership.objects.create(supergroup=everyone_group, subgroup=staff_group)
self.assertCountEqual(
list(get_recursive_subgroups(leadership_group)), [leadership_group.usergroup_ptr]
)
self.assertCountEqual(
list(get_recursive_subgroups(staff_group)),
[leadership_group.usergroup_ptr, staff_group.usergroup_ptr],
)
self.assertCountEqual(
list(get_recursive_subgroups(everyone_group)),
[
leadership_group.usergroup_ptr,
staff_group.usergroup_ptr,
everyone_group.usergroup_ptr,
],
)
self.assertCountEqual(list(get_recursive_strict_subgroups(leadership_group)), [])
self.assertCountEqual(list(get_recursive_strict_subgroups(staff_group)), [leadership_group])
self.assertCountEqual(
list(get_recursive_strict_subgroups(everyone_group)),
[leadership_group, staff_group],
)
self.assertCountEqual(list(get_recursive_group_members(leadership_group)), [desdemona])
self.assertCountEqual(list(get_recursive_group_members(staff_group)), [desdemona, iago])
self.assertCountEqual(
list(get_recursive_group_members(everyone_group)), [desdemona, iago, shiva]
)
self.assertIn(leadership_group.usergroup_ptr, get_recursive_membership_groups(desdemona))
self.assertIn(staff_group.usergroup_ptr, get_recursive_membership_groups(desdemona))
self.assertIn(everyone_group.usergroup_ptr, get_recursive_membership_groups(desdemona))
self.assertIn(staff_group.usergroup_ptr, get_recursive_membership_groups(iago))
self.assertIn(everyone_group.usergroup_ptr, get_recursive_membership_groups(iago))
self.assertIn(everyone_group.usergroup_ptr, get_recursive_membership_groups(shiva))
do_deactivate_user(iago, acting_user=None)
self.assertCountEqual(list(get_recursive_group_members(staff_group)), [desdemona])
self.assertCountEqual(list(get_recursive_group_members(everyone_group)), [desdemona, shiva])
def test_subgroups_of_role_based_system_groups(self) -> None:
realm = get_realm("zulip")
owners_group = NamedUserGroup.objects.get(
realm=realm, name=SystemGroups.OWNERS, is_system_group=True
)
admins_group = NamedUserGroup.objects.get(
realm=realm, name=SystemGroups.ADMINISTRATORS, is_system_group=True
)
moderators_group = NamedUserGroup.objects.get(
realm=realm, name=SystemGroups.MODERATORS, is_system_group=True
)
full_members_group = NamedUserGroup.objects.get(
realm=realm, name=SystemGroups.FULL_MEMBERS, is_system_group=True
)
members_group = NamedUserGroup.objects.get(
realm=realm, name=SystemGroups.MEMBERS, is_system_group=True
)
everyone_group = NamedUserGroup.objects.get(
realm=realm, name=SystemGroups.EVERYONE, is_system_group=True
)
everyone_on_internet_group = NamedUserGroup.objects.get(
realm=realm,
name=SystemGroups.EVERYONE_ON_INTERNET,
is_system_group=True,
)
self.assertCountEqual(list(get_recursive_strict_subgroups(owners_group)), [])
self.assertCountEqual(list(get_recursive_strict_subgroups(admins_group)), [owners_group])
self.assertCountEqual(
list(get_recursive_strict_subgroups(moderators_group)),
[owners_group, admins_group],
)
self.assertCountEqual(
list(get_recursive_strict_subgroups(full_members_group)),
[owners_group, admins_group, moderators_group],
)
self.assertCountEqual(
list(get_recursive_strict_subgroups(members_group)),
[owners_group, admins_group, moderators_group, full_members_group],
)
self.assertCountEqual(
list(get_recursive_strict_subgroups(everyone_group)),
[
owners_group,
admins_group,
moderators_group,
full_members_group,
members_group,
],
)
self.assertCountEqual(
list(get_recursive_strict_subgroups(everyone_on_internet_group)),
[
owners_group,
admins_group,
moderators_group,
full_members_group,
members_group,
everyone_group,
],
)
def test_is_user_in_group(self) -> None:
realm = get_realm("zulip")
shiva = self.example_user("shiva")
iago = self.example_user("iago")
hamlet = self.example_user("hamlet")
2024-04-18 12:23:46 +02:00
moderators_group = NamedUserGroup.objects.get(
name=SystemGroups.MODERATORS, realm=realm, is_system_group=True
)
2024-04-18 12:23:46 +02:00
administrators_group = NamedUserGroup.objects.get(
name=SystemGroups.ADMINISTRATORS, realm=realm, is_system_group=True
)
self.assertTrue(is_user_in_group(moderators_group, shiva))
# Iago is member of a subgroup of moderators group.
self.assertTrue(is_user_in_group(moderators_group, iago))
self.assertFalse(is_user_in_group(moderators_group, iago, direct_member_only=True))
self.assertTrue(is_user_in_group(administrators_group, iago, direct_member_only=True))
self.assertFalse(is_user_in_group(moderators_group, hamlet))
self.assertFalse(is_user_in_group(moderators_group, hamlet, direct_member_only=True))
do_deactivate_user(iago, acting_user=None)
self.assertFalse(is_user_in_group(moderators_group, iago))
self.assertFalse(is_user_in_group(administrators_group, iago, direct_member_only=True))
def test_is_any_user_in_group(self) -> None:
realm = get_realm("zulip")
shiva = self.example_user("shiva").id
iago = self.example_user("iago").id
hamlet = self.example_user("hamlet").id
polonius = self.example_user("polonius").id
moderators_group = NamedUserGroup.objects.get(
name=SystemGroups.MODERATORS, realm=realm, is_system_group=True
)
administrators_group = NamedUserGroup.objects.get(
name=SystemGroups.ADMINISTRATORS, realm=realm, is_system_group=True
)
self.assertTrue(is_any_user_in_group(moderators_group, [shiva, hamlet, polonius]))
# Iago is member of a subgroup of moderators group.
self.assertTrue(is_any_user_in_group(moderators_group, [iago, hamlet, polonius]))
self.assertFalse(
is_any_user_in_group(
moderators_group, [iago, hamlet, polonius], direct_member_only=True
)
)
self.assertTrue(
is_any_user_in_group(
administrators_group, [iago, shiva, hamlet], direct_member_only=True
)
)
self.assertFalse(is_any_user_in_group(moderators_group, [hamlet, polonius]))
self.assertFalse(is_any_user_in_group(moderators_group, [hamlet], direct_member_only=True))
def test_has_user_group_access_for_subgroup(self) -> None:
user_groups: Make locks required for updating user group memberships. **Background** User groups are expected to comply with the DAG constraint for the many-to-many inter-group membership. The check for this constraint has to be performed recursively so that we can find all direct and indirect subgroups of the user group to be added. This kind of check is vulnerable to phantom reads which is possible at the default read committed isolation level because we cannot guarantee that the check is still valid when we are adding the subgroups to the user group. **Solution** To avoid having another transaction concurrently update one of the to-be-subgroup after the recursive check is done, and before the subgroup is added, we use SELECT FOR UPDATE to lock the user group rows. The lock needs to be acquired before a group membership change is about to occur before any check has been conducted. Suppose that we are adding subgroup B to supergroup A, the locking protocol is specified as follows: 1. Acquire a lock for B and all its direct and indirect subgroups. 2. Acquire a lock for A. For the removal of user groups, we acquire a lock for the user group to be removed with all its direct and indirect subgroups. This is the special case A=B, which is still complaint with the protocol. **Error handling** We currently rely on Postgres' deadlock detection to abort transactions and show an error for the users. In the future, we might need some recovery mechanism or at least better error handling. **Notes** An important note is that we need to reuse the recursive CTE query that finds the direct and indirect subgroups when applying the lock on the rows. And the lock needs to be acquired the same way for the addition and removal of direct subgroups. User membership change (as opposed to user group membership) is not affected. Read-only queries aren't either. The locks only protect critical regions where the user group dependency graph might violate the DAG constraint, where users are not participating. **Testing** We implement a transaction test case targeting some typical scenarios when an internal server error is expected to happen (this means that the user group view makes the correct decision to abort the transaction when something goes wrong with locks). To achieve this, we add a development view intended only for unit tests. It has a global BARRIER that can be shared across threads, so that we can synchronize them to consistently reproduce certain potential race conditions prevented by the database locks. The transaction test case lanuches pairs of threads initiating possibly conflicting requests at the same time. The tests are set up such that exactly N of them are expected to succeed with a certain error message (while we don't know each one). **Security notes** get_recursive_subgroups_for_groups will no longer fetch user groups from other realms. As a result, trying to add/remove a subgroup from another realm results in a UserGroup not found error response. We also implement subgroup-specific checks in has_user_group_access to keep permission managing in a single place. Do note that the API currently don't have a way to violate that check because we are only checking the realm ID now.
2023-06-17 04:39:52 +02:00
iago = self.example_user("iago")
zulip_realm = get_realm("zulip")
zulip_group = check_add_user_group(zulip_realm, "zulip", [], acting_user=iago)
2024-04-18 12:23:46 +02:00
moderators_group = NamedUserGroup.objects.get(
name=SystemGroups.MODERATORS, realm=zulip_realm, is_system_group=True
user_groups: Make locks required for updating user group memberships. **Background** User groups are expected to comply with the DAG constraint for the many-to-many inter-group membership. The check for this constraint has to be performed recursively so that we can find all direct and indirect subgroups of the user group to be added. This kind of check is vulnerable to phantom reads which is possible at the default read committed isolation level because we cannot guarantee that the check is still valid when we are adding the subgroups to the user group. **Solution** To avoid having another transaction concurrently update one of the to-be-subgroup after the recursive check is done, and before the subgroup is added, we use SELECT FOR UPDATE to lock the user group rows. The lock needs to be acquired before a group membership change is about to occur before any check has been conducted. Suppose that we are adding subgroup B to supergroup A, the locking protocol is specified as follows: 1. Acquire a lock for B and all its direct and indirect subgroups. 2. Acquire a lock for A. For the removal of user groups, we acquire a lock for the user group to be removed with all its direct and indirect subgroups. This is the special case A=B, which is still complaint with the protocol. **Error handling** We currently rely on Postgres' deadlock detection to abort transactions and show an error for the users. In the future, we might need some recovery mechanism or at least better error handling. **Notes** An important note is that we need to reuse the recursive CTE query that finds the direct and indirect subgroups when applying the lock on the rows. And the lock needs to be acquired the same way for the addition and removal of direct subgroups. User membership change (as opposed to user group membership) is not affected. Read-only queries aren't either. The locks only protect critical regions where the user group dependency graph might violate the DAG constraint, where users are not participating. **Testing** We implement a transaction test case targeting some typical scenarios when an internal server error is expected to happen (this means that the user group view makes the correct decision to abort the transaction when something goes wrong with locks). To achieve this, we add a development view intended only for unit tests. It has a global BARRIER that can be shared across threads, so that we can synchronize them to consistently reproduce certain potential race conditions prevented by the database locks. The transaction test case lanuches pairs of threads initiating possibly conflicting requests at the same time. The tests are set up such that exactly N of them are expected to succeed with a certain error message (while we don't know each one). **Security notes** get_recursive_subgroups_for_groups will no longer fetch user groups from other realms. As a result, trying to add/remove a subgroup from another realm results in a UserGroup not found error response. We also implement subgroup-specific checks in has_user_group_access to keep permission managing in a single place. Do note that the API currently don't have a way to violate that check because we are only checking the realm ID now.
2023-06-17 04:39:52 +02:00
)
lear_realm = get_realm("lear")
lear_group = check_add_user_group(lear_realm, "test", [], acting_user=iago)
user_groups: Make locks required for updating user group memberships. **Background** User groups are expected to comply with the DAG constraint for the many-to-many inter-group membership. The check for this constraint has to be performed recursively so that we can find all direct and indirect subgroups of the user group to be added. This kind of check is vulnerable to phantom reads which is possible at the default read committed isolation level because we cannot guarantee that the check is still valid when we are adding the subgroups to the user group. **Solution** To avoid having another transaction concurrently update one of the to-be-subgroup after the recursive check is done, and before the subgroup is added, we use SELECT FOR UPDATE to lock the user group rows. The lock needs to be acquired before a group membership change is about to occur before any check has been conducted. Suppose that we are adding subgroup B to supergroup A, the locking protocol is specified as follows: 1. Acquire a lock for B and all its direct and indirect subgroups. 2. Acquire a lock for A. For the removal of user groups, we acquire a lock for the user group to be removed with all its direct and indirect subgroups. This is the special case A=B, which is still complaint with the protocol. **Error handling** We currently rely on Postgres' deadlock detection to abort transactions and show an error for the users. In the future, we might need some recovery mechanism or at least better error handling. **Notes** An important note is that we need to reuse the recursive CTE query that finds the direct and indirect subgroups when applying the lock on the rows. And the lock needs to be acquired the same way for the addition and removal of direct subgroups. User membership change (as opposed to user group membership) is not affected. Read-only queries aren't either. The locks only protect critical regions where the user group dependency graph might violate the DAG constraint, where users are not participating. **Testing** We implement a transaction test case targeting some typical scenarios when an internal server error is expected to happen (this means that the user group view makes the correct decision to abort the transaction when something goes wrong with locks). To achieve this, we add a development view intended only for unit tests. It has a global BARRIER that can be shared across threads, so that we can synchronize them to consistently reproduce certain potential race conditions prevented by the database locks. The transaction test case lanuches pairs of threads initiating possibly conflicting requests at the same time. The tests are set up such that exactly N of them are expected to succeed with a certain error message (while we don't know each one). **Security notes** get_recursive_subgroups_for_groups will no longer fetch user groups from other realms. As a result, trying to add/remove a subgroup from another realm results in a UserGroup not found error response. We also implement subgroup-specific checks in has_user_group_access to keep permission managing in a single place. Do note that the API currently don't have a way to violate that check because we are only checking the realm ID now.
2023-06-17 04:39:52 +02:00
self.assertFalse(has_user_group_access_for_subgroup(lear_group, iago))
self.assertTrue(has_user_group_access_for_subgroup(zulip_group, iago))
self.assertTrue(has_user_group_access_for_subgroup(moderators_group, iago))
user_groups: Make locks required for updating user group memberships. **Background** User groups are expected to comply with the DAG constraint for the many-to-many inter-group membership. The check for this constraint has to be performed recursively so that we can find all direct and indirect subgroups of the user group to be added. This kind of check is vulnerable to phantom reads which is possible at the default read committed isolation level because we cannot guarantee that the check is still valid when we are adding the subgroups to the user group. **Solution** To avoid having another transaction concurrently update one of the to-be-subgroup after the recursive check is done, and before the subgroup is added, we use SELECT FOR UPDATE to lock the user group rows. The lock needs to be acquired before a group membership change is about to occur before any check has been conducted. Suppose that we are adding subgroup B to supergroup A, the locking protocol is specified as follows: 1. Acquire a lock for B and all its direct and indirect subgroups. 2. Acquire a lock for A. For the removal of user groups, we acquire a lock for the user group to be removed with all its direct and indirect subgroups. This is the special case A=B, which is still complaint with the protocol. **Error handling** We currently rely on Postgres' deadlock detection to abort transactions and show an error for the users. In the future, we might need some recovery mechanism or at least better error handling. **Notes** An important note is that we need to reuse the recursive CTE query that finds the direct and indirect subgroups when applying the lock on the rows. And the lock needs to be acquired the same way for the addition and removal of direct subgroups. User membership change (as opposed to user group membership) is not affected. Read-only queries aren't either. The locks only protect critical regions where the user group dependency graph might violate the DAG constraint, where users are not participating. **Testing** We implement a transaction test case targeting some typical scenarios when an internal server error is expected to happen (this means that the user group view makes the correct decision to abort the transaction when something goes wrong with locks). To achieve this, we add a development view intended only for unit tests. It has a global BARRIER that can be shared across threads, so that we can synchronize them to consistently reproduce certain potential race conditions prevented by the database locks. The transaction test case lanuches pairs of threads initiating possibly conflicting requests at the same time. The tests are set up such that exactly N of them are expected to succeed with a certain error message (while we don't know each one). **Security notes** get_recursive_subgroups_for_groups will no longer fetch user groups from other realms. As a result, trying to add/remove a subgroup from another realm results in a UserGroup not found error response. We also implement subgroup-specific checks in has_user_group_access to keep permission managing in a single place. Do note that the API currently don't have a way to violate that check because we are only checking the realm ID now.
2023-06-17 04:39:52 +02:00
class UserGroupAPITestCase(UserGroupTestCase):
def test_user_group_create(self) -> None:
hamlet = self.example_user("hamlet")
# Test success
self.login("hamlet")
params = {
"name": "support",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Support team",
}
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_success(result)
2024-04-18 12:23:46 +02:00
self.assert_length(NamedUserGroup.objects.filter(realm=hamlet.realm), 10)
# Check default value of settings.
2024-04-18 12:23:46 +02:00
everyone_system_group = NamedUserGroup.objects.get(
name="role:everyone", realm=hamlet.realm, is_system_group=True
)
2024-04-18 12:23:46 +02:00
support_group = NamedUserGroup.objects.get(name="support", realm=hamlet.realm)
self.assertCountEqual(support_group.can_manage_group.direct_members.all(), [hamlet])
2024-04-18 12:23:46 +02:00
self.assertEqual(support_group.can_mention_group, everyone_system_group.usergroup_ptr)
# Test invalid member error
params = {
"name": "backend",
"members": orjson.dumps([1111]).decode(),
"description": "Backend team",
}
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_error(result, "Invalid user ID: 1111")
2024-04-18 12:23:46 +02:00
self.assert_length(NamedUserGroup.objects.filter(realm=hamlet.realm), 10)
# Test we cannot create group with same name again
params = {
"name": "support",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Support team",
}
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_error(result, "User group 'support' already exists.")
2024-04-18 12:23:46 +02:00
self.assert_length(NamedUserGroup.objects.filter(realm=hamlet.realm), 10)
# Test we cannot create group with same name again
params = {
"name": "a" * (NamedUserGroup.MAX_NAME_LENGTH + 1),
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Test group",
}
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_error(result, "User group name cannot exceed 100 characters.")
2024-04-18 12:23:46 +02:00
self.assert_length(NamedUserGroup.objects.filter(realm=hamlet.realm), 10)
# Test emtpty group name.
params = {
"name": "",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Test empty group",
}
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_error(result, "User group name can't be empty!")
2024-04-18 12:23:46 +02:00
self.assert_length(NamedUserGroup.objects.filter(realm=hamlet.realm), 10)
# Test invalid prefixes for user group name.
params = {
"name": "@test",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Test group",
}
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_error(result, "User group name cannot start with '@'.")
2024-04-18 12:23:46 +02:00
self.assert_length(NamedUserGroup.objects.filter(realm=hamlet.realm), 10)
params["name"] = "role:manager"
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_error(result, "User group name cannot start with 'role:'.")
2024-04-18 12:23:46 +02:00
self.assert_length(NamedUserGroup.objects.filter(realm=hamlet.realm), 10)
params["name"] = "user:1"
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_error(result, "User group name cannot start with 'user:'.")
2024-04-18 12:23:46 +02:00
self.assert_length(NamedUserGroup.objects.filter(realm=hamlet.realm), 10)
params["name"] = "stream:1"
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_error(result, "User group name cannot start with 'stream:'.")
2024-04-18 12:23:46 +02:00
self.assert_length(NamedUserGroup.objects.filter(realm=hamlet.realm), 10)
params["name"] = "channel:1"
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_error(result, "User group name cannot start with 'channel:'.")
2024-04-18 12:23:46 +02:00
self.assert_length(NamedUserGroup.objects.filter(realm=hamlet.realm), 10)
def do_test_set_group_setting_during_user_group_creation(self, setting_name: str) -> None:
self.login("hamlet")
hamlet = self.example_user("hamlet")
# Delete all existing user groups except the hamletcharacters group
NamedUserGroup.objects.exclude(name="hamletcharacters").filter(
is_system_group=False
).delete()
permission_configuration = NamedUserGroup.GROUP_PERMISSION_SETTINGS[setting_name]
leadership_group = check_add_user_group(
hamlet.realm, "leadership", [hamlet], acting_user=hamlet
)
2024-04-18 12:23:46 +02:00
moderators_group = NamedUserGroup.objects.get(
name="role:moderators", realm=hamlet.realm, is_system_group=True
)
params = {
"name": "support",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Support team",
}
params[setting_name] = orjson.dumps(moderators_group.id).decode()
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_success(result)
2024-04-18 12:23:46 +02:00
support_group = NamedUserGroup.objects.get(name="support", realm=hamlet.realm)
self.assertEqual(getattr(support_group, setting_name), moderators_group.usergroup_ptr)
params = {
"name": "test",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Test group",
}
params[setting_name] = orjson.dumps(leadership_group.id).decode()
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_success(result)
2024-04-18 12:23:46 +02:00
test_group = NamedUserGroup.objects.get(name="test", realm=hamlet.realm)
self.assertEqual(getattr(test_group, setting_name), leadership_group.usergroup_ptr)
2024-04-18 12:23:46 +02:00
nobody_group = NamedUserGroup.objects.get(
name="role:nobody", realm=hamlet.realm, is_system_group=True
)
params = {
"name": "marketing",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Marketing team",
}
params[setting_name] = orjson.dumps(nobody_group.id).decode()
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_success(result)
2024-04-18 12:23:46 +02:00
marketing_group = NamedUserGroup.objects.get(name="marketing", realm=hamlet.realm)
self.assertEqual(getattr(marketing_group, setting_name), nobody_group.usergroup_ptr)
othello = self.example_user("othello")
params = {
"name": "backend",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Backend team",
}
params[setting_name] = orjson.dumps(
{
"direct_members": [othello.id],
"direct_subgroups": [leadership_group.id, moderators_group.id],
}
).decode()
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_success(result)
backend_group = NamedUserGroup.objects.get(name="backend", realm=hamlet.realm)
self.assertCountEqual(
list(getattr(backend_group, setting_name).direct_members.all()),
[othello],
)
self.assertCountEqual(
list(getattr(backend_group, setting_name).direct_subgroups.all()),
[leadership_group, moderators_group],
)
params = {
"name": "help",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Troubleshooting team",
}
params[setting_name] = orjson.dumps(
{
"direct_members": [],
"direct_subgroups": [moderators_group.id],
}
).decode()
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_success(result)
help_group = NamedUserGroup.objects.get(name="help", realm=hamlet.realm)
# We do not create a new UserGroup object in such case.
self.assertEqual(getattr(help_group, setting_name).id, moderators_group.id)
2024-04-18 12:23:46 +02:00
internet_group = NamedUserGroup.objects.get(
name="role:internet", realm=hamlet.realm, is_system_group=True
)
params = {
"name": "frontend",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Frontend team",
}
params[setting_name] = orjson.dumps(internet_group.id).decode()
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_error(
result, f"'{setting_name}' setting cannot be set to 'role:internet' group."
)
2024-04-18 12:23:46 +02:00
owners_group = NamedUserGroup.objects.get(
name="role:owners", realm=hamlet.realm, is_system_group=True
)
params = {
"name": "frontend-team",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Frontend team",
}
params[setting_name] = orjson.dumps(owners_group.id).decode()
result = self.client_post("/json/user_groups/create", info=params)
if not permission_configuration.allow_owners_group:
self.assert_json_error(
result, f"'{setting_name}' setting cannot be set to 'role:owners' group."
)
else:
self.assert_json_success(result)
frontend_group = NamedUserGroup.objects.get(name="frontend-team", realm=hamlet.realm)
self.assertEqual(getattr(frontend_group, setting_name), owners_group.usergroup_ptr)
params = {
"name": "frontend",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Frontend team",
}
params[setting_name] = orjson.dumps(1111).decode()
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_error(result, "Invalid user group")
params = {
"name": "frontend",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Frontend team",
}
params[setting_name] = orjson.dumps(
{
"direct_members": [1111],
"direct_subgroups": [leadership_group.id, moderators_group.id],
}
).decode()
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_error(result, "Invalid user ID: 1111")
params = {
"name": "frontend",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Frontend team",
}
params[setting_name] = orjson.dumps(
{
"direct_members": [othello.id],
"direct_subgroups": [1111, moderators_group.id],
}
).decode()
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_error(result, "Invalid user group ID: 1111")
# Test can_mention_group cannot be set to a deactivated group.
do_deactivate_user_group(leadership_group, acting_user=None)
params = {
"name": "social",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Social team",
"can_mention_group": orjson.dumps(leadership_group.id).decode(),
}
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_error(result, "User group is deactivated.")
params = {
"name": "social",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Social team",
"can_mention_group": orjson.dumps(
{
"direct_members": [othello.id],
"direct_subgroups": [leadership_group.id],
}
).decode(),
}
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_error(result, "User group is deactivated.")
# Reactivate group to use it in further tests.
leadership_group.deactivated = False
leadership_group.save()
def test_set_group_settings_during_user_group_creation(self) -> None:
for setting_name in NamedUserGroup.GROUP_PERMISSION_SETTINGS:
self.do_test_set_group_setting_during_user_group_creation(setting_name)
def test_user_group_get(self) -> None:
# Test success
user_profile = self.example_user("hamlet")
self.login_user(user_profile)
result = self.client_get("/json/user_groups")
response_dict = self.assert_json_success(result)
self.assert_length(
2024-04-18 12:23:46 +02:00
response_dict["user_groups"],
NamedUserGroup.objects.filter(realm=user_profile.realm).count(),
)
def test_user_group_update(self) -> None:
hamlet = self.example_user("hamlet")
self.login("hamlet")
params = {
"name": "support",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Support team",
}
self.client_post("/json/user_groups/create", info=params)
2024-04-18 12:23:46 +02:00
user_group = NamedUserGroup.objects.get(name="support")
# Test success
params = {
"name": "help",
"description": "Troubleshooting team",
}
result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params)
self.assert_json_success(result)
2024-04-18 12:23:46 +02:00
user_group = NamedUserGroup.objects.get(id=user_group.id)
self.assertEqual(user_group.name, "help")
self.assertEqual(user_group.description, "Troubleshooting team")
# Test when new data is not supplied.
result = self.client_patch(f"/json/user_groups/{user_group.id}", info={})
self.assert_json_error(result, "No new data supplied")
# Test when only one of name or description is supplied.
params = {"name": "help team"}
result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params)
self.assert_json_success(result)
2024-04-18 12:23:46 +02:00
user_group = NamedUserGroup.objects.get(id=user_group.id)
self.assertEqual(user_group.name, "help team")
self.assertEqual(user_group.description, "Troubleshooting team")
# Test when invalid user group is supplied
params = {"name": "help"}
result = self.client_patch("/json/user_groups/1111", info=params)
self.assert_json_error(result, "Invalid user group")
lear_realm = get_realm("lear")
lear_cordelia = self.lear_user("cordelia")
lear_test_group = check_add_user_group(
lear_realm, "test", [lear_cordelia], acting_user=lear_cordelia
)
result = self.client_patch(f"/json/user_groups/{lear_test_group.id}", info=params)
self.assert_json_error(result, "Invalid user group")
params = {"name": "a" * (NamedUserGroup.MAX_NAME_LENGTH + 1)}
result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params)
self.assert_json_error(result, "User group name cannot exceed 100 characters.")
# Test emtpty group name.
params = {"name": ""}
result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params)
self.assert_json_error(result, "User group name can't be empty!")
# Test invalid prefixes for user group name.
params = {"name": "@test"}
result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params)
self.assert_json_error(result, "User group name cannot start with '@'.")
params = {"name": "role:manager"}
result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params)
self.assert_json_error(result, "User group name cannot start with 'role:'.")
params = {"name": "user:1"}
result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params)
self.assert_json_error(result, "User group name cannot start with 'user:'.")
params = {"name": "stream:1"}
result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params)
self.assert_json_error(result, "User group name cannot start with 'stream:'.")
params = {"name": "channel:1"}
result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params)
self.assert_json_error(result, "User group name cannot start with 'channel:'.")
do_deactivate_user_group(user_group, acting_user=None)
params = {"description": "Troubleshooting and support team"}
result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params)
self.assert_json_error(result, "You can only change name of deactivated user groups")
params = {"name": "Support team"}
result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params)
self.assert_json_success(result)
user_group = NamedUserGroup.objects.get(id=user_group.id)
self.assertEqual(user_group.name, "Support team")
def do_test_update_user_group_permission_settings(self, setting_name: str) -> None:
hamlet = self.example_user("hamlet")
permission_configuration = NamedUserGroup.GROUP_PERMISSION_SETTINGS[setting_name]
support_group = NamedUserGroup.objects.get(name="support", realm=hamlet.realm)
marketing_group = NamedUserGroup.objects.get(name="marketing", realm=hamlet.realm)
2024-04-18 12:23:46 +02:00
moderators_group = NamedUserGroup.objects.get(
name="role:moderators", realm=hamlet.realm, is_system_group=True
)
self.login("desdemona")
params = {}
params[setting_name] = orjson.dumps(
{
"new": moderators_group.id,
}
).decode()
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_success(result)
2024-04-18 12:23:46 +02:00
support_group = NamedUserGroup.objects.get(name="support", realm=hamlet.realm)
self.assertEqual(getattr(support_group, setting_name), moderators_group.usergroup_ptr)
params[setting_name] = orjson.dumps(
{
"new": marketing_group.id,
}
).decode()
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_success(result)
2024-04-18 12:23:46 +02:00
support_group = NamedUserGroup.objects.get(name="support", realm=hamlet.realm)
self.assertEqual(getattr(support_group, setting_name), marketing_group.usergroup_ptr)
2024-04-18 12:23:46 +02:00
nobody_group = NamedUserGroup.objects.get(
name="role:nobody", realm=hamlet.realm, is_system_group=True
)
params[setting_name] = orjson.dumps({"new": nobody_group.id}).decode()
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_success(result)
2024-04-18 12:23:46 +02:00
support_group = NamedUserGroup.objects.get(name="support", realm=hamlet.realm)
self.assertEqual(getattr(support_group, setting_name), nobody_group.usergroup_ptr)
othello = self.example_user("othello")
params[setting_name] = orjson.dumps(
{
"new": {
"direct_members": [othello.id],
"direct_subgroups": [moderators_group.id, marketing_group.id],
}
}
).decode()
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_success(result)
support_group = NamedUserGroup.objects.get(name="support", realm=hamlet.realm)
self.assertCountEqual(
list(getattr(support_group, setting_name).direct_members.all()),
[othello],
)
self.assertCountEqual(
list(getattr(support_group, setting_name).direct_subgroups.all()),
[marketing_group, moderators_group],
)
prospero = self.example_user("prospero")
params[setting_name] = orjson.dumps(
{
"new": {
"direct_members": [othello.id, prospero.id],
"direct_subgroups": [moderators_group.id, marketing_group.id],
}
}
).decode()
previous_setting_id = getattr(support_group, setting_name).id
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_success(result)
support_group = NamedUserGroup.objects.get(name="support", realm=hamlet.realm)
# Test that the existing UserGroup object is updated.
self.assertEqual(getattr(support_group, setting_name).id, previous_setting_id)
self.assertCountEqual(
list(getattr(support_group, setting_name).direct_members.all()),
[othello, prospero],
)
self.assertCountEqual(
list(getattr(support_group, setting_name).direct_subgroups.all()),
[marketing_group, moderators_group],
)
params[setting_name] = orjson.dumps({"new": marketing_group.id}).decode()
previous_setting_id = getattr(support_group, setting_name).id
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_success(result)
support_group = NamedUserGroup.objects.get(name="support", realm=hamlet.realm)
# Test that the previous UserGroup object is deleted.
self.assertFalse(UserGroup.objects.filter(id=previous_setting_id).exists())
self.assertEqual(getattr(support_group, setting_name).id, marketing_group.id)
2024-04-18 12:23:46 +02:00
owners_group = NamedUserGroup.objects.get(
name="role:owners", realm=hamlet.realm, is_system_group=True
)
params[setting_name] = orjson.dumps({"new": owners_group.id}).decode()
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
if not permission_configuration.allow_owners_group:
self.assert_json_error(
result, f"'{setting_name}' setting cannot be set to 'role:owners' group."
)
else:
self.assert_json_success(result)
support_group = NamedUserGroup.objects.get(name="support", realm=hamlet.realm)
self.assertEqual(getattr(support_group, setting_name).id, owners_group.id)
2024-04-18 12:23:46 +02:00
internet_group = NamedUserGroup.objects.get(
name="role:internet", realm=hamlet.realm, is_system_group=True
)
params[setting_name] = orjson.dumps({"new": internet_group.id}).decode()
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_error(
result, f"'{setting_name}' setting cannot be set to 'role:internet' group."
)
params[setting_name] = orjson.dumps({"new": 1111}).decode()
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_error(result, "Invalid user group")
params[setting_name] = orjson.dumps(
{
"new": {
"direct_members": [1111, othello.id],
"direct_subgroups": [moderators_group.id, marketing_group.id],
}
}
).decode()
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_error(result, "Invalid user ID: 1111")
params[setting_name] = orjson.dumps(
{
"new": {
"direct_members": [prospero.id, othello.id],
"direct_subgroups": [1111, marketing_group.id],
}
}
).decode()
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_error(result, "Invalid user group ID: 1111")
leadership_group = NamedUserGroup.objects.get(realm=hamlet.realm, name="leadership")
do_deactivate_user_group(leadership_group, acting_user=None)
params[setting_name] = orjson.dumps({"new": leadership_group.id}).decode()
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_error(result, "User group is deactivated.")
params[setting_name] = orjson.dumps(
{
"new": {
"direct_members": [prospero.id],
"direct_subgroups": [leadership_group.id],
}
}
).decode()
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_error(result, "User group is deactivated.")
params[setting_name] = orjson.dumps({"new": moderators_group.id}).decode()
result = self.client_patch(f"/json/user_groups/{leadership_group.id}", info=params)
self.assert_json_error(result, "You can only change name of deactivated user groups")
leadership_group.deactivated = False
leadership_group.save()
def test_update_user_group_permission_settings(self) -> None:
hamlet = self.example_user("hamlet")
check_add_user_group(hamlet.realm, "support", [hamlet], acting_user=hamlet)
check_add_user_group(hamlet.realm, "marketing", [hamlet], acting_user=hamlet)
check_add_user_group(hamlet.realm, "leadership", [hamlet], acting_user=hamlet)
for setting_name in NamedUserGroup.GROUP_PERMISSION_SETTINGS:
self.do_test_update_user_group_permission_settings(setting_name)
def test_user_group_update_to_already_existing_name(self) -> None:
hamlet = self.example_user("hamlet")
self.login_user(hamlet)
realm = get_realm("zulip")
support_user_group = check_add_user_group(realm, "support", [hamlet], acting_user=hamlet)
marketing_user_group = check_add_user_group(
realm, "marketing", [hamlet], acting_user=hamlet
)
params = {
"name": marketing_user_group.name,
}
result = self.client_patch(f"/json/user_groups/{support_user_group.id}", info=params)
self.assert_json_error(result, f"User group '{marketing_user_group.name}' already exists.")
def test_update_can_mention_group_setting_with_previous_value_passed(self) -> None:
hamlet = self.example_user("hamlet")
support_group = check_add_user_group(hamlet.realm, "support", [hamlet], acting_user=hamlet)
marketing_group = check_add_user_group(
hamlet.realm, "marketing", [hamlet], acting_user=hamlet
)
everyone_group = NamedUserGroup.objects.get(
name="role:everyone", realm=hamlet.realm, is_system_group=True
)
moderators_group = NamedUserGroup.objects.get(
name="role:moderators", realm=hamlet.realm, is_system_group=True
)
self.assertEqual(marketing_group.can_mention_group.id, everyone_group.id)
self.login("hamlet")
params = {
"can_mention_group": orjson.dumps(
{
"new": marketing_group.id,
"old": moderators_group.id,
}
).decode()
}
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_error(result, "'old' value does not match the expected value.")
othello = self.example_user("othello")
params = {
"can_mention_group": orjson.dumps(
{
"new": marketing_group.id,
"old": {
"direct_members": [othello.id],
"direct_subgroups": [everyone_group.id],
},
}
).decode()
}
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_error(result, "'old' value does not match the expected value.")
params = {
"can_mention_group": orjson.dumps(
{
"new": marketing_group.id,
"old": everyone_group.id,
}
).decode()
}
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_success(result)
support_group = NamedUserGroup.objects.get(name="support", realm=hamlet.realm)
self.assertEqual(support_group.can_mention_group, marketing_group.usergroup_ptr)
params = {
"can_mention_group": orjson.dumps(
{
"new": {
"direct_members": [othello.id],
"direct_subgroups": [moderators_group.id],
},
"old": {"direct_members": [], "direct_subgroups": [marketing_group.id]},
}
).decode()
}
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_success(result)
support_group = NamedUserGroup.objects.get(name="support", realm=hamlet.realm)
self.assertCountEqual(
list(support_group.can_mention_group.direct_members.all()),
[othello],
)
self.assertCountEqual(
list(support_group.can_mention_group.direct_subgroups.all()),
[moderators_group],
)
params = {
"can_mention_group": orjson.dumps(
{
"new": {
"direct_members": [hamlet.id],
"direct_subgroups": [marketing_group.id],
},
"old": support_group.can_mention_group_id,
}
).decode()
}
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_error(result, "'old' value does not match the expected value.")
params = {
"can_mention_group": orjson.dumps(
{
"new": {
"direct_members": [hamlet.id],
"direct_subgroups": [marketing_group.id],
},
"old": moderators_group.id,
}
).decode()
}
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_error(result, "'old' value does not match the expected value.")
params = {
"can_mention_group": orjson.dumps(
{
"new": {
"direct_members": [hamlet.id],
"direct_subgroups": [marketing_group.id],
},
"old": {
"direct_members": [othello.id],
"direct_subgroups": [moderators_group.id],
},
}
).decode()
}
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_success(result)
self.assertCountEqual(
list(support_group.can_mention_group.direct_members.all()),
[hamlet],
)
self.assertCountEqual(
list(support_group.can_mention_group.direct_subgroups.all()),
[marketing_group],
)
# Test error cases for completeness.
params = {
"can_mention_group": orjson.dumps(
{
"new": {
"direct_members": [othello.id],
"direct_subgroups": [moderators_group.id],
},
"old": {
"direct_members": [hamlet.id],
"direct_subgroups": [1111],
},
}
).decode()
}
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_error(result, "'old' value does not match the expected value.")
params = {
"can_mention_group": orjson.dumps(
{
"new": 1111,
"old": {
"direct_members": [hamlet.id],
"direct_subgroups": [marketing_group.id],
},
}
).decode()
}
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_error(result, "Invalid user group")
def test_user_group_deactivation(self) -> None:
support_group = self.create_user_group_for_test(
"support", acting_user=self.example_user("desdemona")
)
leadership_group = self.create_user_group_for_test(
"leadership", acting_user=self.example_user("othello")
)
add_subgroups_to_user_group(support_group, [leadership_group], acting_user=None)
realm = get_realm("zulip")
admins_group = NamedUserGroup.objects.get(name=SystemGroups.ADMINISTRATORS, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
admins_group,
acting_user=None,
)
self.login("othello")
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_error(result, "Insufficient permission")
members_group = NamedUserGroup.objects.get(name=SystemGroups.MEMBERS, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
members_group,
acting_user=None,
)
self.login("othello")
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_success(result)
support_group = NamedUserGroup.objects.get(name="support", realm=realm)
self.assertTrue(support_group.deactivated)
support_group.deactivated = False
support_group.save()
# Check admins can deactivate groups even if they are not members
# of the group.
self.login("iago")
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_success(result)
support_group = NamedUserGroup.objects.get(name="support", realm=realm)
self.assertTrue(support_group.deactivated)
support_group.deactivated = False
support_group.save()
# Check moderators can deactivate groups if they are allowed by
# can_manage_all_groups even when they are not members of the group.
admins_group = NamedUserGroup.objects.get(name=SystemGroups.ADMINISTRATORS, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
admins_group,
acting_user=None,
)
self.login("shiva")
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_error(result, "Insufficient permission")
moderators_group = NamedUserGroup.objects.get(name=SystemGroups.MODERATORS, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
moderators_group,
acting_user=None,
)
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_success(result)
support_group = NamedUserGroup.objects.get(name="support", realm=realm)
self.assertTrue(support_group.deactivated)
support_group.deactivated = False
support_group.save()
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
admins_group,
acting_user=None,
)
admins_group = NamedUserGroup.objects.get(
name=SystemGroups.ADMINISTRATORS, realm=realm, is_system_group=True
)
moderators_group = NamedUserGroup.objects.get(
name=SystemGroups.MODERATORS, realm=realm, is_system_group=True
)
do_change_user_group_permission_setting(
support_group, "can_manage_group", admins_group, acting_user=None
)
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_error(result, "Insufficient permission")
do_change_user_group_permission_setting(
support_group, "can_manage_group", moderators_group, acting_user=None
)
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_success(result)
support_group = NamedUserGroup.objects.get(name="support", realm=realm)
self.assertTrue(support_group.deactivated)
support_group.deactivated = False
support_group.save()
setting_group = self.create_or_update_anonymous_group_for_setting(
[self.example_user("shiva")], [admins_group]
)
do_change_user_group_permission_setting(
support_group, "can_manage_group", setting_group, acting_user=None
)
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_success(result)
support_group = NamedUserGroup.objects.get(name="support", realm=realm)
self.assertTrue(support_group.deactivated)
support_group.deactivated = False
support_group.save()
moderators_group = NamedUserGroup.objects.get(
name=SystemGroups.MODERATORS, realm=realm, is_system_group=True
)
do_change_realm_permission_group_setting(
realm, "can_manage_all_groups", moderators_group, acting_user=None
)
# Check that group that is subgroup of another group cannot be deactivated.
result = self.client_post(f"/json/user_groups/{leadership_group.id}/deactivate")
self.assert_json_error(result, "Cannot deactivate user group in use.")
data = orjson.loads(result.content)
self.assertEqual(
data["objections"], [{"type": "subgroup", "supergroup_ids": [support_group.id]}]
)
# If the supergroup is itself deactivated, then subgroup can be deactivated.
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_success(result)
result = self.client_post(f"/json/user_groups/{leadership_group.id}/deactivate")
self.assert_json_success(result)
leadership_group = NamedUserGroup.objects.get(name="leadership", realm=realm)
self.assertTrue(leadership_group.deactivated)
# Check that system groups cannot be deactivated at all.
self.login("desdemona")
members_system_group = NamedUserGroup.objects.get(
name=SystemGroups.MEMBERS, realm=realm, is_system_group=True
)
result = self.client_post(f"/json/user_groups/{members_system_group.id}/deactivate")
self.assert_json_error(result, "Insufficient permission")
def test_user_group_deactivation_with_group_used_for_settings(self) -> None:
realm = get_realm("zulip")
hamlet = self.example_user("hamlet")
support_group = self.create_user_group_for_test(
"support", acting_user=self.example_user("othello")
)
moderators_group = NamedUserGroup.objects.get(
name=SystemGroups.MODERATORS, realm=realm, is_system_group=True
)
self.login("othello")
for setting_name in Realm.REALM_PERMISSION_GROUP_SETTINGS:
anonymous_setting_group = self.create_or_update_anonymous_group_for_setting(
[hamlet], [moderators_group, support_group]
)
do_change_realm_permission_group_setting(
realm, setting_name, anonymous_setting_group, acting_user=None
)
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_error(result, "Cannot deactivate user group in use.")
data = orjson.loads(result.content)
self.assertEqual(data["objections"], [{"type": "realm", "settings": [setting_name]}])
do_change_realm_permission_group_setting(
realm, setting_name, support_group, acting_user=None
)
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_error(result, "Cannot deactivate user group in use.")
data = orjson.loads(result.content)
self.assertEqual(data["objections"], [{"type": "realm", "settings": [setting_name]}])
# Reset the realm setting to one of the system group so this setting
# does not interfere when testing for another setting.
do_change_realm_permission_group_setting(
realm, setting_name, moderators_group, acting_user=None
)
stream = ensure_stream(realm, "support", acting_user=None)
self.login("desdemona")
for setting_name in Stream.stream_permission_group_settings:
do_change_stream_group_based_setting(
stream, setting_name, support_group, acting_user=None
)
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_error(result, "Cannot deactivate user group in use.")
data = orjson.loads(result.content)
self.assertEqual(
data["objections"],
[{"type": "channel", "channel_id": stream.id, "settings": [setting_name]}],
)
# Test the group can be deactivated, if the stream which uses
# this group for a setting is deactivated.
do_deactivate_stream(stream, acting_user=None)
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_success(result)
support_group = NamedUserGroup.objects.get(name="support", realm=realm)
self.assertTrue(support_group.deactivated)
support_group.deactivated = False
support_group.save()
do_unarchive_stream(stream, "support", acting_user=None)
anonymous_setting_group = self.create_or_update_anonymous_group_for_setting(
[hamlet], [moderators_group, support_group]
)
do_change_stream_group_based_setting(
stream, setting_name, anonymous_setting_group, acting_user=None
)
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_error(result, "Cannot deactivate user group in use.")
data = orjson.loads(result.content)
self.assertEqual(
data["objections"],
[{"type": "channel", "channel_id": stream.id, "settings": [setting_name]}],
)
# Test the group can be deactivated, if the stream which uses
# this group for a setting is deactivated.
do_deactivate_stream(stream, acting_user=None)
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_success(result)
support_group = NamedUserGroup.objects.get(name="support", realm=realm)
self.assertTrue(support_group.deactivated)
# Reactivate the group again for further testing.
support_group.deactivated = False
support_group.save()
# Reset the stream setting to one of the system group so this setting
# does not interfere when testing for another setting.
do_change_stream_group_based_setting(
stream, setting_name, moderators_group, acting_user=None
)
leadership_group = self.create_user_group_for_test(
"leadership", acting_user=self.example_user("othello")
)
for setting_name in NamedUserGroup.GROUP_PERMISSION_SETTINGS:
do_change_user_group_permission_setting(
leadership_group, setting_name, support_group, acting_user=None
)
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_error(result, "Cannot deactivate user group in use.")
data = orjson.loads(result.content)
self.assertEqual(
data["objections"],
[
{
"type": "user_group",
"group_id": leadership_group.id,
"settings": [setting_name],
}
],
)
# Test the group can be deactivated, if the user group which uses
# this group for a setting is deactivated.
do_deactivate_user_group(leadership_group, acting_user=None)
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_success(result)
support_group = NamedUserGroup.objects.get(name="support", realm=realm)
self.assertTrue(support_group.deactivated)
support_group.deactivated = False
support_group.save()
leadership_group.deactivated = False
leadership_group.save()
anonymous_setting_group = self.create_or_update_anonymous_group_for_setting(
[hamlet], [moderators_group, support_group]
)
do_change_user_group_permission_setting(
leadership_group, setting_name, anonymous_setting_group, acting_user=None
)
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_error(result, "Cannot deactivate user group in use.")
data = orjson.loads(result.content)
self.assertEqual(
data["objections"],
[
{
"type": "user_group",
"group_id": leadership_group.id,
"settings": [setting_name],
}
],
)
# Test the group can be deactivated, if the user group which uses
# this group for a setting is deactivated.
do_deactivate_user_group(leadership_group, acting_user=None)
result = self.client_post(f"/json/user_groups/{support_group.id}/deactivate")
self.assert_json_success(result)
support_group = NamedUserGroup.objects.get(name="support", realm=realm)
self.assertTrue(support_group.deactivated)
# Reactivate the group again for further testing.
support_group.deactivated = False
support_group.save()
leadership_group.deactivated = False
leadership_group.save()
# Reset the group setting to one of the system group so this setting
# does not interfere when testing for another setting.
do_change_user_group_permission_setting(
leadership_group, setting_name, moderators_group, acting_user=None
)
def test_query_counts(self) -> None:
hamlet = self.example_user("hamlet")
cordelia = self.example_user("cordelia")
realm = hamlet.realm
self.login_user(hamlet)
original_users = [
create_user(
email=f"original_user{i}@zulip.com",
password=None,
realm=realm,
full_name="full_name",
)
for i in range(50)
]
with self.assert_database_query_count(9):
user_group = create_user_group_in_database(
name="support",
members=[hamlet, cordelia, *original_users],
realm=realm,
acting_user=hamlet,
)
self.assert_user_membership(user_group, [hamlet, cordelia, *original_users])
new_users = [
create_user(
email=f"new_user{i}@zulip.com",
password=None,
realm=realm,
full_name="full_name",
)
for i in range(50)
]
new_user_ids = [user.id for user in new_users]
munge = lambda obj: orjson.dumps(obj).decode()
params = dict(add=munge(new_user_ids))
with (
mock.patch("zerver.views.user_groups.notify_for_user_group_subscription_changes"),
self.assert_database_query_count(14),
):
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params)
self.assert_json_success(result)
with self.assert_database_query_count(1):
all_user_ids = get_user_group_member_ids(user_group, direct_member_only=True)
self.assert_length(all_user_ids, 102)
self.assert_user_membership(user_group, [hamlet, cordelia, *new_users, *original_users])
def test_update_members_of_user_group(self) -> None:
hamlet = self.example_user("hamlet")
self.login("hamlet")
params = {
"name": "support",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Support team",
}
self.client_post("/json/user_groups/create", info=params)
2024-04-18 12:23:46 +02:00
user_group = NamedUserGroup.objects.get(name="support")
# Test add members
self.assert_user_membership(user_group, [hamlet])
othello = self.example_user("othello")
# A bot
webhook_bot = self.example_user("webhook_bot")
# A deactivated user
iago = self.example_user("iago")
do_deactivate_user(iago, acting_user=None)
params = {"add": orjson.dumps([othello.id]).decode()}
initial_last_message = self.get_last_message()
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params)
self.assert_json_success(result)
self.assert_user_membership(user_group, [hamlet, othello])
# A notification message is sent for adding to user group.
self.assertNotEqual(self.get_last_message(), initial_last_message)
expected_notification = (
f"{silent_mention_syntax_for_user(hamlet)} added you to the group @_*support*."
)
self.assertEqual(self.get_last_message().content, expected_notification)
# Test adding a member already there.
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params)
self.assert_json_error(result, f"User {othello.id} is already a member of this group")
self.assert_user_membership(user_group, [hamlet, othello])
2023-06-28 00:32:16 +02:00
# Test user adding itself, bot and deactivated user to user group.
desdemona = self.example_user("desdemona")
self.login_user(desdemona)
params = {"add": orjson.dumps([desdemona.id, iago.id, webhook_bot.id]).decode()}
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params)
self.assert_json_error(result, f"Invalid user ID: {iago.id}")
params = {"add": orjson.dumps([desdemona.id, webhook_bot.id]).decode()}
initial_last_message = self.get_last_message()
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params)
self.assert_json_success(result)
self.assert_user_membership(user_group, [hamlet, othello, desdemona, webhook_bot])
# No notification message is sent for adding to user group.
self.assertEqual(self.get_last_message(), initial_last_message)
# For normal testing we again log in with hamlet
self.logout()
self.login_user(hamlet)
# Test remove members
params = {"delete": orjson.dumps([othello.id]).decode()}
initial_last_message = self.get_last_message()
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params)
self.assert_json_success(result)
self.assert_user_membership(user_group, [hamlet, desdemona, webhook_bot])
# A notification message is sent for removing from user group.
self.assertNotEqual(self.get_last_message(), initial_last_message)
expected_notification = (
f"{silent_mention_syntax_for_user(hamlet)} removed you from the group @_*support*."
)
self.assertEqual(self.get_last_message().content, expected_notification)
# Test remove a member that's already removed
params = {"delete": orjson.dumps([othello.id]).decode()}
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params)
self.assert_json_error(result, f"There is no member '{othello.id}' in this user group")
self.assert_user_membership(user_group, [hamlet, desdemona, webhook_bot])
# Test user remove itself,bot and deactivated user from user group.
desdemona = self.example_user("desdemona")
self.login_user(desdemona)
# Add user to group after reactivation to test removing deactivated user.
do_reactivate_user(iago, acting_user=None)
self.client_post(
f"/json/user_groups/{user_group.id}/members",
info={"add": orjson.dumps([iago.id]).decode()},
)
do_deactivate_user(iago, acting_user=None)
params = {"delete": orjson.dumps([iago.id, desdemona.id, webhook_bot.id]).decode()}
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params)
self.assert_json_error(result, f"Invalid user ID: {iago.id}")
params = {"delete": orjson.dumps([desdemona.id, webhook_bot.id]).decode()}
initial_last_message = self.get_last_message()
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params)
self.assert_json_success(result)
self.assert_user_membership(user_group, [hamlet])
# No notification message is sent for removing from user group.
self.assertEqual(self.get_last_message(), initial_last_message)
# Test adding and removing subgroups.
admins_group = NamedUserGroup.objects.get(
name=SystemGroups.ADMINISTRATORS, realm=hamlet.realm, is_system_group=True
)
cordelia = self.example_user("cordelia")
subgroup = check_add_user_group(
hamlet.realm, "leadership", [cordelia], acting_user=cordelia
)
params = {"add_subgroups": orjson.dumps([subgroup.id, admins_group.id]).decode()}
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params)
self.assert_json_success(result)
self.assert_subgroup_membership(user_group, [subgroup, admins_group])
params = {"delete_subgroups": orjson.dumps([admins_group.id]).decode()}
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params)
self.assert_json_success(result)
self.assert_subgroup_membership(user_group, [subgroup])
# Test when nothing is provided
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info={})
msg = 'Nothing to do. Specify at least one of "add", "delete", "add_subgroups" or "delete_subgroups".'
self.assert_json_error(result, msg)
self.assert_user_membership(user_group, [hamlet])
# Test adding or removing members from a deactivated group.
do_deactivate_user_group(user_group, acting_user=None)
params = {"delete": orjson.dumps([hamlet.id]).decode()}
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params)
self.assert_json_error(result, "User group is deactivated.")
self.assert_user_membership(user_group, [hamlet])
params = {"add": orjson.dumps([iago.id]).decode()}
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params)
self.assert_json_error(result, "User group is deactivated.")
self.assert_user_membership(user_group, [hamlet])
def test_mentions(self) -> None:
cordelia = self.example_user("cordelia")
hamlet = self.example_user("hamlet")
othello = self.example_user("othello")
zoe = self.example_user("ZOE")
realm = cordelia.realm
group_name = "support"
stream_name = "Dev help"
content_with_group_mention = "hey @*support* can you help us with this?"
ensure_stream(realm, stream_name, acting_user=None)
all_users = {cordelia, hamlet, othello, zoe}
support_team = {hamlet, zoe}
sender = cordelia
other_users = all_users - support_team
for user in all_users:
self.subscribe(user, stream_name)
check_add_user_group(
name=group_name, initial_members=list(support_team), realm=realm, acting_user=hamlet
)
payload = dict(
type="stream",
to=orjson.dumps(stream_name).decode(),
topic="whatever",
content=content_with_group_mention,
)
result = self.api_post(sender, "/api/v1/messages", payload)
self.assert_json_success(result)
for user in support_team:
um = most_recent_usermessage(user)
self.assertTrue(um.flags.mentioned)
for user in other_users:
um = most_recent_usermessage(user)
self.assertFalse(um.flags.mentioned)
def test_can_create_groups_for_creating_user_group(self) -> None:
hamlet = self.example_user("hamlet")
realm = hamlet.realm
aaron = self.example_user("aaron")
aaron_group = check_add_user_group(
get_realm("zulip"), "aaron_group", [aaron], acting_user=aaron
)
def check_create_user_group(acting_user: str, error_msg: str | None = None) -> None:
params = {
"name": "support",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Support Team",
}
result = self.api_post(
self.example_user(acting_user), "/api/v1/user_groups/create", info=params
)
if error_msg is None:
self.assert_json_success(result)
# One group already exists in the test database and we've created one
# more for testing just before running this function.
self.assert_length(NamedUserGroup.objects.filter(realm=realm), 11)
else:
self.assert_json_error(result, error_msg)
# Check only admins are allowed to create user group. Admins are allowed even if
# they are not a member of the group.
admins_group = NamedUserGroup.objects.get(name=SystemGroups.ADMINISTRATORS, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_create_groups",
admins_group,
acting_user=None,
)
check_create_user_group("shiva", "Insufficient permission")
check_create_user_group("iago")
NamedUserGroup.objects.get(name="support", realm=realm).delete()
# Check moderators are allowed to create user group but not members. Moderators are
# allowed even if they are not a member of the group.
moderators_group = NamedUserGroup.objects.get(name=SystemGroups.MODERATORS, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_create_groups",
moderators_group,
acting_user=None,
)
check_create_user_group("hamlet", "Insufficient permission")
check_create_user_group("shiva")
NamedUserGroup.objects.get(name="support", realm=realm).delete()
# Check if members of a NamedUserGroup are allowed to create user groups.
do_change_realm_permission_group_setting(
realm,
"can_create_groups",
aaron_group,
acting_user=None,
)
check_create_user_group("shiva", "Insufficient permission")
check_create_user_group("aaron")
NamedUserGroup.objects.get(name="support", realm=realm).delete()
# Check if members of an anonymous group are allowed to create user groups.
cordelia = self.example_user("cordelia")
anonymous_group = self.create_or_update_anonymous_group_for_setting(
[cordelia], [admins_group, moderators_group]
)
do_change_realm_permission_group_setting(
realm,
"can_create_groups",
anonymous_group,
acting_user=None,
)
check_create_user_group("aaron", "Insufficient permission")
check_create_user_group("cordelia")
NamedUserGroup.objects.get(name="support", realm=realm).delete()
check_create_user_group("shiva")
NamedUserGroup.objects.get(name="support", realm=realm).delete()
check_create_user_group("iago")
NamedUserGroup.objects.get(name="support", realm=realm).delete()
# Check only members are allowed to create the user group.
members_group = NamedUserGroup.objects.get(name=SystemGroups.MEMBERS, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_create_groups",
members_group,
acting_user=None,
)
check_create_user_group("polonius", "Not allowed for guest users")
check_create_user_group("othello")
NamedUserGroup.objects.get(name="support", realm=realm).delete()
# Check only full members are allowed to create the user group.
full_members_group = NamedUserGroup.objects.get(name=SystemGroups.FULL_MEMBERS, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_create_groups",
full_members_group,
acting_user=None,
)
do_set_realm_property(realm, "waiting_period_threshold", 10, acting_user=None)
othello = self.example_user("othello")
othello.date_joined = timezone_now() - timedelta(days=9)
othello.save()
check_create_user_group("othello", "Insufficient permission")
othello.date_joined = timezone_now() - timedelta(days=11)
othello.save()
promote_new_full_members()
check_create_user_group("othello")
def test_realm_level_setting_for_updating_user_groups(self) -> None:
othello = self.example_user("othello")
self.login("othello")
params = {
"name": "support",
"members": orjson.dumps([othello.id]).decode(),
"description": "Support team",
}
self.client_post("/json/user_groups/create", info=params)
2024-04-18 12:23:46 +02:00
user_group = NamedUserGroup.objects.get(name="support")
def check_update_user_group(
new_name: str,
new_description: str,
acting_user: str,
error_msg: str | None = None,
) -> None:
params = {
"name": new_name,
"description": new_description,
}
# Ensure that this update request is not a no-op.
self.assertNotEqual(user_group.name, new_name)
self.assertNotEqual(user_group.description, new_description)
result = self.api_patch(
self.example_user(acting_user), f"/api/v1/user_groups/{user_group.id}", info=params
)
if error_msg is None:
self.assert_json_success(result)
user_group.refresh_from_db()
self.assertEqual(user_group.name, new_name)
self.assertEqual(user_group.description, new_description)
else:
self.assert_json_error(result, error_msg)
realm = othello.realm
# Check only admins are allowed to update user group. Admins are allowed even if
# they are not a member of the group.
admins_group = NamedUserGroup.objects.get(name=SystemGroups.ADMINISTRATORS, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
admins_group,
acting_user=None,
)
check_update_user_group("help", "Troubleshooting team", "shiva", "Insufficient permission")
check_update_user_group("help", "Troubleshooting team", "iago")
# Check moderators are allowed to update user group but not members. Moderators are
# allowed even if they are not a member of the group.
moderators_group = NamedUserGroup.objects.get(name=SystemGroups.MODERATORS, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
moderators_group,
acting_user=None,
)
check_update_user_group("support", "Support team", "hamlet", "Insufficient permission")
check_update_user_group("support1", "Support team - test", "iago")
check_update_user_group("support", "Support team", "othello")
# Check only members are allowed to update the user group.
members_group = NamedUserGroup.objects.get(name=SystemGroups.MEMBERS, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
members_group,
acting_user=None,
)
check_update_user_group(
"help", "Troubleshooting team", "polonius", "Not allowed for guest users"
)
check_update_user_group("help", "Troubleshooting team", "cordelia")
# Check user who is member of a subgroup of the group being updated
# can also update the group.
cordelia = self.example_user("cordelia")
subgroup = check_add_user_group(realm, "leadership", [cordelia], acting_user=cordelia)
add_subgroups_to_user_group(user_group, [subgroup], acting_user=None)
check_update_user_group(
"support",
"Support team",
"cordelia",
)
# Check only full members are allowed to update the user group and only if belong to the
# user group.
full_members_group = NamedUserGroup.objects.get(name=SystemGroups.FULL_MEMBERS, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
full_members_group,
acting_user=None,
)
do_set_realm_property(realm, "waiting_period_threshold", 10, acting_user=None)
aaron = self.example_user("aaron")
aaron.date_joined = timezone_now() - timedelta(days=9)
aaron.save()
cordelia = self.example_user("cordelia")
cordelia.date_joined = timezone_now() - timedelta(days=11)
cordelia.save()
promote_new_full_members()
check_update_user_group(
"help",
"Troubleshooting team",
"cordelia",
)
check_update_user_group("support", "Support team", "aaron", "Insufficient permission")
othello.date_joined = timezone_now() - timedelta(days=11)
othello.save()
promote_new_full_members()
check_update_user_group("support", "Support team", "othello")
def test_group_level_setting_for_updating_user_groups(self) -> None:
othello = self.example_user("othello")
iago = self.example_user("iago")
user_group = check_add_user_group(
get_realm("zulip"), "support", [othello, iago], acting_user=othello
)
hamlet = self.example_user("hamlet")
leadership_group = check_add_user_group(
get_realm("zulip"), "leadership", [hamlet], acting_user=hamlet
)
def check_update_user_group(
new_name: str,
new_description: str,
acting_user: str,
error_msg: str | None = None,
) -> None:
params = {
"name": new_name,
"description": new_description,
}
# Ensure that this update request is not a no-op.
self.assertNotEqual(user_group.name, new_name)
self.assertNotEqual(user_group.description, new_description)
result = self.api_patch(
self.example_user(acting_user), f"/api/v1/user_groups/{user_group.id}", info=params
)
if error_msg is None:
self.assert_json_success(result)
user_group.refresh_from_db()
self.assertEqual(user_group.name, new_name)
self.assertEqual(user_group.description, new_description)
else:
self.assert_json_error(result, error_msg)
realm = othello.realm
nobody_group = NamedUserGroup.objects.get(name=SystemGroups.NOBODY, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
nobody_group,
acting_user=None,
)
# Default value of can_manage_group is "Nobody" system group.
check_update_user_group("help", "Troubleshooting team", "iago", "Insufficient permission")
check_update_user_group("help", "Troubleshooting team", "aaron", "Insufficient permission")
system_group_dict = get_role_based_system_groups_dict(realm)
owners_group = system_group_dict[SystemGroups.OWNERS]
do_change_user_group_permission_setting(
user_group, "can_manage_group", owners_group, acting_user=None
)
check_update_user_group("help", "Troubleshooting team", "iago", "Insufficient permission")
check_update_user_group("help", "Troubleshooting team", "desdemona")
user_group.can_manage_group = system_group_dict[SystemGroups.MEMBERS]
user_group.save()
check_update_user_group(
"support", "Support team", "polonius", "Not allowed for guest users"
)
check_update_user_group(
"support",
"Support team",
"cordelia",
)
setting_group = self.create_or_update_anonymous_group_for_setting(
[self.example_user("cordelia")], [leadership_group, owners_group]
)
do_change_user_group_permission_setting(
user_group, "can_manage_group", setting_group, acting_user=None
)
check_update_user_group("help", "Troubleshooting team", "iago", "Insufficient permission")
check_update_user_group("help", "Troubleshooting team", "hamlet")
check_update_user_group(
"support",
"Support team",
"cordelia",
)
check_update_user_group("help", "Troubleshooting team", "desdemona")
def test_realm_level_setting_for_updating_members(self) -> None:
user_group = self.create_user_group_for_test(
"support", acting_user=self.example_user("desdemona")
)
aaron = self.example_user("aaron")
othello = self.example_user("othello")
cordelia = self.example_user("cordelia")
def check_adding_members_to_group(acting_user: str, error_msg: str | None = None) -> None:
params = {"add": orjson.dumps([aaron.id]).decode()}
self.assert_user_membership(user_group, [othello])
result = self.api_post(
self.example_user(acting_user),
f"/api/v1/user_groups/{user_group.id}/members",
info=params,
)
if error_msg is None:
self.assert_json_success(result)
self.assert_user_membership(user_group, [aaron, othello])
else:
self.assert_json_error(result, error_msg)
def check_removing_members_from_group(
acting_user: str, error_msg: str | None = None
) -> None:
params = {"delete": orjson.dumps([aaron.id]).decode()}
self.assert_user_membership(user_group, [aaron, othello])
result = self.api_post(
self.example_user(acting_user),
f"/api/v1/user_groups/{user_group.id}/members",
info=params,
)
if error_msg is None:
self.assert_json_success(result)
self.assert_user_membership(user_group, [othello])
else:
self.assert_json_error(result, error_msg)
realm = get_realm("zulip")
# Check only admins are allowed to add/remove users from the group.
admins_group = NamedUserGroup.objects.get(name=SystemGroups.ADMINISTRATORS, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
admins_group,
acting_user=None,
)
check_adding_members_to_group("shiva", "Insufficient permission")
check_adding_members_to_group("iago")
check_removing_members_from_group("shiva", "Insufficient permission")
check_removing_members_from_group("iago")
# Check moderators are allowed to add/remove users from the group but not members.
moderators_group = NamedUserGroup.objects.get(name=SystemGroups.MODERATORS, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
moderators_group,
acting_user=None,
)
check_adding_members_to_group("cordelia", "Insufficient permission")
check_adding_members_to_group("shiva")
check_removing_members_from_group("hamlet", "Insufficient permission")
check_removing_members_from_group("shiva")
# Check if members of a NamedUserGroup are allowed to add/remove members.
othello_group = check_add_user_group(
get_realm("zulip"), "othello_group", [othello], acting_user=othello
)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
othello_group,
acting_user=None,
)
check_adding_members_to_group("shiva", "Insufficient permission")
check_adding_members_to_group("othello")
check_removing_members_from_group("shiva", "Insufficient permission")
check_removing_members_from_group("othello")
# Check if members of an anonymous group are allowed to add/remove members.
anonymous_group = self.create_or_update_anonymous_group_for_setting(
[othello], [admins_group, moderators_group]
)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
anonymous_group,
acting_user=None,
)
check_adding_members_to_group("cordelia", "Insufficient permission")
check_adding_members_to_group("shiva")
check_removing_members_from_group("hamlet", "Insufficient permission")
check_removing_members_from_group("shiva")
check_adding_members_to_group("iago")
check_removing_members_from_group("iago")
check_adding_members_to_group("othello")
check_removing_members_from_group("othello")
# Check only members are allowed to add/remove users in the group.
members_group = NamedUserGroup.objects.get(name=SystemGroups.MEMBERS, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
members_group,
acting_user=None,
)
check_adding_members_to_group("polonius", "Not allowed for guest users")
# User with role member but not part of the target group should
# be allowed to add members to the group if they are part of
# `can_manage_all_groups`.
check_adding_members_to_group("cordelia")
check_removing_members_from_group("cordelia")
check_adding_members_to_group("othello")
check_removing_members_from_group("polonius", "Not allowed for guest users")
check_removing_members_from_group("othello")
# Check only full members are allowed to add/remove users in the group.
full_members_group = NamedUserGroup.objects.get(name=SystemGroups.FULL_MEMBERS, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
full_members_group,
acting_user=None,
)
do_set_realm_property(realm, "waiting_period_threshold", 10, acting_user=None)
othello.date_joined = timezone_now() - timedelta(days=9)
othello.save()
promote_new_full_members()
check_adding_members_to_group("cordelia", "Insufficient permission")
cordelia.date_joined = timezone_now() - timedelta(days=11)
cordelia.save()
promote_new_full_members()
# Full members who are not part of the target group should
# be allowed to add members to the group if they are part of
# `can_manage_all_groups`.
check_adding_members_to_group("cordelia")
check_removing_members_from_group("cordelia")
othello.date_joined = timezone_now() - timedelta(days=11)
othello.save()
promote_new_full_members()
check_adding_members_to_group("othello")
othello.date_joined = timezone_now() - timedelta(days=9)
othello.save()
promote_new_full_members()
check_removing_members_from_group("othello", "Insufficient permission")
othello.date_joined = timezone_now() - timedelta(days=11)
othello.save()
promote_new_full_members()
check_removing_members_from_group("othello")
def test_group_level_setting_for_adding_members(self) -> None:
othello = self.example_user("othello")
user_group = check_add_user_group(
get_realm("zulip"), "support", [othello], acting_user=self.example_user("desdemona")
)
hamlet = self.example_user("hamlet")
leadership_group = check_add_user_group(
get_realm("zulip"), "leadership", [hamlet], acting_user=hamlet
)
aaron = self.example_user("aaron")
def check_adding_members_to_group(acting_user: str, error_msg: str | None = None) -> None:
params = {"add": orjson.dumps([aaron.id]).decode()}
self.assert_user_membership(user_group, [othello])
result = self.api_post(
self.example_user(acting_user),
f"/api/v1/user_groups/{user_group.id}/members",
info=params,
)
if error_msg is None:
self.assert_json_success(result)
self.assert_user_membership(user_group, [aaron, othello])
else:
self.assert_json_error(result, error_msg)
realm = get_realm("zulip")
nobody_group = NamedUserGroup.objects.get(name=SystemGroups.NOBODY, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
nobody_group,
acting_user=None,
)
# Default value of can_add_members_group is "group_creator".
check_adding_members_to_group("iago", "Insufficient permission")
check_adding_members_to_group("desdemona")
bulk_remove_members_from_user_groups([user_group], [aaron.id], acting_user=None)
# Remove aaron from group to add them again in further tests.
bulk_remove_members_from_user_groups([user_group], [aaron.id], acting_user=None)
# Test setting can_add_members_group to a system group.
system_group_dict = get_role_based_system_groups_dict(realm)
owners_group = system_group_dict[SystemGroups.OWNERS]
do_change_user_group_permission_setting(
user_group, "can_manage_group", nobody_group, acting_user=None
)
do_change_user_group_permission_setting(
user_group, "can_add_members_group", owners_group, acting_user=None
)
check_adding_members_to_group("iago", "Insufficient permission")
check_adding_members_to_group("desdemona")
bulk_remove_members_from_user_groups([user_group], [aaron.id], acting_user=None)
# Although we can't set this value to everyone via the API,
# it is a good way here to test whether guest users are allowed
# to take the action or not.
everyone_group = system_group_dict[SystemGroups.EVERYONE]
do_change_user_group_permission_setting(
user_group, "can_add_members_group", everyone_group, acting_user=None
)
check_adding_members_to_group("polonius", "Not allowed for guest users")
check_adding_members_to_group("cordelia")
bulk_remove_members_from_user_groups([user_group], [aaron.id], acting_user=None)
# Test setting can_add_members_group to an anonymous group with
# subgroups.
setting_group = self.create_or_update_anonymous_group_for_setting(
[self.example_user("cordelia")], [leadership_group, owners_group]
)
do_change_user_group_permission_setting(
user_group, "can_add_members_group", setting_group, acting_user=None
)
check_adding_members_to_group("iago", "Insufficient permission")
check_adding_members_to_group("hamlet")
bulk_remove_members_from_user_groups([user_group], [aaron.id], acting_user=None)
check_adding_members_to_group("cordelia")
bulk_remove_members_from_user_groups([user_group], [aaron.id], acting_user=None)
check_adding_members_to_group("desdemona")
bulk_remove_members_from_user_groups([user_group], [aaron.id], acting_user=None)
# If user is part of `can_manage_group`, they need not be part
# of `can_add_members_group` to add members.
othello_group = self.create_or_update_anonymous_group_for_setting([othello], [])
hamlet_group = self.create_or_update_anonymous_group_for_setting([hamlet], [])
do_change_user_group_permission_setting(
user_group, "can_manage_group", othello_group, acting_user=None
)
do_change_user_group_permission_setting(
user_group, "can_add_members_group", hamlet_group, acting_user=None
)
check_adding_members_to_group("othello")
bulk_remove_members_from_user_groups([user_group], [aaron.id], acting_user=None)
def test_group_level_setting_for_removing_members(self) -> None:
othello = self.example_user("othello")
user_group = check_add_user_group(
get_realm("zulip"), "support", [othello], acting_user=self.example_user("desdemona")
)
hamlet = self.example_user("hamlet")
leadership_group = check_add_user_group(
get_realm("zulip"), "leadership", [hamlet], acting_user=hamlet
)
aaron = self.example_user("aaron")
def check_removing_members_from_group(
acting_user: str, error_msg: str | None = None
) -> None:
params = {"delete": orjson.dumps([aaron.id]).decode()}
self.assert_user_membership(user_group, [aaron, othello])
result = self.api_post(
self.example_user(acting_user),
f"/api/v1/user_groups/{user_group.id}/members",
info=params,
)
if error_msg is None:
self.assert_json_success(result)
self.assert_user_membership(user_group, [othello])
else:
self.assert_json_error(result, error_msg)
realm = get_realm("zulip")
nobody_group = NamedUserGroup.objects.get(name=SystemGroups.NOBODY, realm=realm)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
nobody_group,
acting_user=None,
)
# Default value of can_add_members_group is "group_creator".
bulk_add_members_to_user_groups([user_group], [aaron.id], acting_user=None)
check_removing_members_from_group("iago", "Insufficient permission")
check_removing_members_from_group("desdemona")
# Test setting can_add_members_group to a system group.
system_group_dict = get_role_based_system_groups_dict(realm)
owners_group = system_group_dict[SystemGroups.OWNERS]
do_change_user_group_permission_setting(
user_group, "can_manage_group", owners_group, acting_user=None
)
bulk_add_members_to_user_groups([user_group], [aaron.id], acting_user=None)
check_removing_members_from_group("iago", "Insufficient permission")
check_removing_members_from_group("desdemona")
# Although we can't set this value to everyone via the API,
# it is a good way here to test whether guest users are allowed
# to take the action or not.
everyone_group = system_group_dict[SystemGroups.EVERYONE]
do_change_user_group_permission_setting(
user_group, "can_manage_group", everyone_group, acting_user=None
)
bulk_add_members_to_user_groups([user_group], [aaron.id], acting_user=None)
check_removing_members_from_group("polonius", "Not allowed for guest users")
check_removing_members_from_group("cordelia")
# Test setting can_manage_group to an anonymous group with
# subgroups.
setting_group = self.create_or_update_anonymous_group_for_setting(
[self.example_user("cordelia")], [leadership_group, owners_group]
)
do_change_user_group_permission_setting(
user_group, "can_manage_group", setting_group, acting_user=None
)
bulk_add_members_to_user_groups([user_group], [aaron.id], acting_user=None)
check_removing_members_from_group("iago", "Insufficient permission")
check_removing_members_from_group("hamlet")
bulk_add_members_to_user_groups([user_group], [aaron.id], acting_user=None)
check_removing_members_from_group("cordelia")
bulk_add_members_to_user_groups([user_group], [aaron.id], acting_user=None)
check_removing_members_from_group("desdemona")
def test_adding_yourself_to_group(self) -> None:
realm = get_realm("zulip")
othello = self.example_user("othello")
user_group = check_add_user_group(realm, "support", [othello], acting_user=othello)
nobody_group = NamedUserGroup.objects.get(
name=SystemGroups.NOBODY, realm=realm, is_system_group=True
)
# Set permissions to manage the group and adding others to group
# to nobody to test can_join_group in isolation.
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
nobody_group,
acting_user=None,
)
do_change_user_group_permission_setting(
user_group,
"can_manage_group",
nobody_group,
acting_user=None,
)
do_change_user_group_permission_setting(
user_group,
"can_add_members_group",
nobody_group,
acting_user=None,
)
def check_adding_yourself_to_group(acting_user: str, error_msg: str | None = None) -> None:
user = self.example_user(acting_user)
self.assert_user_membership(user_group, [othello])
params = {"add": orjson.dumps([user.id]).decode()}
result = self.api_post(
user,
f"/api/v1/user_groups/{user_group.id}/members",
info=params,
)
if error_msg is not None:
self.assert_json_error(result, error_msg)
self.assert_user_membership(user_group, [othello])
else:
self.assert_json_success(result)
self.assert_user_membership(user_group, [othello, user])
# Remove the added user again for further tests.
bulk_remove_members_from_user_groups([user_group], [user.id], acting_user=None)
admins_group = NamedUserGroup.objects.get(
name=SystemGroups.ADMINISTRATORS, realm=realm, is_system_group=True
)
do_change_user_group_permission_setting(
user_group,
"can_join_group",
admins_group.usergroup_ptr,
acting_user=None,
)
check_adding_yourself_to_group("shiva", "Insufficient permission")
check_adding_yourself_to_group("iago")
check_adding_yourself_to_group("desdemona")
# Test with setting set to a user defined group.
hamlet = self.example_user("hamlet")
cordelia = self.example_user("cordelia")
leadership_group = check_add_user_group(
realm, "leadership", [hamlet, cordelia], acting_user=hamlet
)
do_change_user_group_permission_setting(
user_group,
"can_join_group",
leadership_group.usergroup_ptr,
acting_user=None,
)
check_adding_yourself_to_group("iago", "Insufficient permission")
check_adding_yourself_to_group("hamlet")
# Test with setting set to an anonymous group.
shiva = self.example_user("shiva")
setting_group = self.create_or_update_anonymous_group_for_setting(
[shiva], [leadership_group]
)
do_change_user_group_permission_setting(
user_group,
"can_join_group",
setting_group,
acting_user=None,
)
check_adding_yourself_to_group("iago", "Insufficient permission")
check_adding_yourself_to_group("cordelia")
check_adding_yourself_to_group("shiva")
# If user is allowed to add anyone, then they can join themselves
# even when can_join_group setting does not allow them to do so.
do_change_user_group_permission_setting(
user_group,
"can_join_group",
nobody_group,
acting_user=None,
)
self.assertEqual(user_group.can_join_group.named_user_group, nobody_group)
check_adding_yourself_to_group("iago", "Insufficient permission")
do_change_user_group_permission_setting(
user_group,
"can_add_members_group",
admins_group,
acting_user=None,
)
check_adding_yourself_to_group("iago")
# If user is allowed to manage the group, then they can join themselves
# even when can_join_group and can_add_members_group does not allow them.
do_change_user_group_permission_setting(
user_group,
"can_add_members_group",
nobody_group,
acting_user=None,
)
self.assertEqual(user_group.can_add_members_group.named_user_group, nobody_group)
check_adding_yourself_to_group("iago", "Insufficient permission")
do_change_user_group_permission_setting(
user_group,
"can_manage_group",
admins_group,
acting_user=None,
)
check_adding_yourself_to_group("iago")
do_change_user_group_permission_setting(
user_group,
"can_manage_group",
nobody_group,
acting_user=None,
)
self.assertEqual(realm.can_manage_all_groups.named_user_group, nobody_group)
check_adding_yourself_to_group("iago", "Insufficient permission")
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
admins_group,
acting_user=None,
)
check_adding_yourself_to_group("iago")
def test_leaving_a_group(self) -> None:
realm = get_realm("zulip")
othello = self.example_user("othello")
user_group = check_add_user_group(realm, "support", [othello], acting_user=othello)
nobody_group = NamedUserGroup.objects.get(
name=SystemGroups.NOBODY, realm=realm, is_system_group=True
)
# Set manage permissions to nobody to test can_leave_group in
# isolation.
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
nobody_group,
acting_user=None,
)
do_change_user_group_permission_setting(
user_group,
"can_manage_group",
nobody_group,
acting_user=None,
)
def check_leaving_a_group(acting_user: str, error_msg: str | None = None) -> None:
user = self.example_user(acting_user)
bulk_add_members_to_user_groups([user_group], [user.id], acting_user=None)
params = {"delete": orjson.dumps([user.id]).decode()}
result = self.api_post(
user,
f"/api/v1/user_groups/{user_group.id}/members",
info=params,
)
if error_msg is not None:
self.assert_json_error(result, error_msg)
self.assert_user_membership(user_group, [user, othello])
# Remove the user for the next test.
bulk_remove_members_from_user_groups([user_group], [user.id], acting_user=None)
else:
self.assert_json_success(result)
self.assert_member_not_in_group(user_group, user)
admins_group = NamedUserGroup.objects.get(
name=SystemGroups.ADMINISTRATORS, realm=realm, is_system_group=True
)
do_change_user_group_permission_setting(
user_group,
"can_leave_group",
admins_group,
acting_user=None,
)
check_leaving_a_group("shiva", "Insufficient permission")
check_leaving_a_group("iago")
check_leaving_a_group("desdemona")
# Test with setting set to a user defined group.
hamlet = self.example_user("hamlet")
cordelia = self.example_user("cordelia")
leadership_group = check_add_user_group(
realm, "leadership", [hamlet, cordelia], acting_user=hamlet
)
do_change_user_group_permission_setting(
user_group,
"can_leave_group",
leadership_group,
acting_user=None,
)
check_leaving_a_group("iago", "Insufficient permission")
check_leaving_a_group("hamlet")
# Test with setting set to an anonymous group.
shiva = self.example_user("shiva")
setting_group = self.create_or_update_anonymous_group_for_setting(
[shiva], [leadership_group]
)
do_change_user_group_permission_setting(
user_group,
"can_leave_group",
setting_group,
acting_user=None,
)
check_leaving_a_group("iago", "Insufficient permission")
check_leaving_a_group("cordelia")
check_leaving_a_group("shiva")
# If user is allowed to manage a group, then they can leave
# even when can_leave_group does not allow them to do so.
do_change_user_group_permission_setting(
user_group,
"can_leave_group",
nobody_group,
acting_user=None,
)
do_change_user_group_permission_setting(
user_group,
"can_manage_group",
admins_group,
acting_user=None,
)
check_leaving_a_group("iago")
# If user is allowed to manage all groups, then they can leave
# even when can_leave_group does not allow them to do so.
owners_group = NamedUserGroup.objects.get(
name=SystemGroups.OWNERS, realm=realm, is_system_group=True
)
do_change_user_group_permission_setting(
user_group,
"can_manage_group",
nobody_group,
acting_user=None,
)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
owners_group,
acting_user=None,
)
check_leaving_a_group("desdemona")
def test_editing_system_user_groups(self) -> None:
desdemona = self.example_user("desdemona")
iago = self.example_user("iago")
othello = self.example_user("othello")
aaron = self.example_user("aaron")
2024-04-18 12:23:46 +02:00
user_group = NamedUserGroup.objects.get(
realm=iago.realm, name=SystemGroups.FULL_MEMBERS, is_system_group=True
)
def check_support_group_permission(acting_user: UserProfile) -> None:
self.login_user(acting_user)
params = {
"name": "Full members user group",
"description": "Full members system user group.",
}
result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params)
self.assert_json_error(result, "Insufficient permission")
params = {"add": orjson.dumps([aaron.id]).decode()}
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params)
self.assert_json_error(result, "Insufficient permission")
params = {"delete": orjson.dumps([othello.id]).decode()}
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params)
self.assert_json_error(result, "Insufficient permission")
check_support_group_permission(desdemona)
check_support_group_permission(iago)
check_support_group_permission(othello)
def test_promote_new_full_members(self) -> None:
realm = get_realm("zulip")
cordelia = self.example_user("cordelia")
hamlet = self.example_user("hamlet")
cordelia.date_joined = timezone_now() - timedelta(days=11)
cordelia.save()
hamlet.date_joined = timezone_now() - timedelta(days=8)
hamlet.save()
do_set_realm_property(realm, "waiting_period_threshold", 10, acting_user=None)
2024-04-18 12:23:46 +02:00
full_members_group = NamedUserGroup.objects.get(
realm=realm, name=SystemGroups.FULL_MEMBERS, is_system_group=True
)
self.assertTrue(
UserGroupMembership.objects.filter(
user_profile=cordelia, user_group=full_members_group
).exists()
)
self.assertFalse(
UserGroupMembership.objects.filter(
user_profile=hamlet, user_group=full_members_group
).exists()
)
current_time = timezone_now()
with time_machine.travel((current_time + timedelta(days=3)), tick=False):
promote_new_full_members()
self.assertTrue(
UserGroupMembership.objects.filter(
user_profile=cordelia, user_group=full_members_group
).exists()
)
self.assertTrue(
UserGroupMembership.objects.filter(
user_profile=hamlet, user_group=full_members_group
).exists()
)
def test_updating_subgroups_of_user_group(self) -> None:
realm = get_realm("zulip")
desdemona = self.example_user("desdemona")
iago = self.example_user("iago")
hamlet = self.example_user("hamlet")
othello = self.example_user("othello")
leadership_group = check_add_user_group(
realm, "leadership", [desdemona, iago, hamlet], acting_user=desdemona
)
support_group = check_add_user_group(
realm, "support", [hamlet, othello], acting_user=hamlet
)
test_group = check_add_user_group(realm, "test", [hamlet], acting_user=hamlet)
self.login("hamlet")
# Group creator can add or remove subgroups as they are member of can_manage_group.
params = {"add": orjson.dumps([leadership_group.id]).decode()}
result = self.client_post(f"/json/user_groups/{support_group.id}/subgroups", info=params)
self.assert_json_success(result)
self.assert_subgroup_membership(support_group, [leadership_group])
params = {"delete": orjson.dumps([leadership_group.id]).decode()}
result = self.client_post(f"/json/user_groups/{support_group.id}/subgroups", info=params)
self.assert_json_success(result)
self.assert_subgroup_membership(support_group, [])
result = self.client_post(f"/json/user_groups/{support_group.id}/subgroups", info=params)
self.assert_json_error(
result,
f"User group {leadership_group.id} is not a subgroup of this group.",
)
self.assert_subgroup_membership(support_group, [])
params = {"add": orjson.dumps([leadership_group.id]).decode()}
self.client_post(f"/json/user_groups/{support_group.id}/subgroups", info=params)
self.assert_subgroup_membership(support_group, [leadership_group])
result = self.client_post(f"/json/user_groups/{support_group.id}/subgroups", info=params)
self.assert_json_error(
result,
f"User group {leadership_group.id} is already a subgroup of this group.",
)
self.assert_subgroup_membership(support_group, [leadership_group])
self.login("desdemona")
params = {"add": orjson.dumps([support_group.id]).decode()}
result = self.client_post(f"/json/user_groups/{leadership_group.id}/subgroups", info=params)
self.assert_json_error(
result,
f"User group {leadership_group.id} is already a subgroup of one of the passed subgroups.",
)
self.assert_subgroup_membership(support_group, [leadership_group])
params = {"add": orjson.dumps([support_group.id]).decode()}
result = self.client_post(f"/json/user_groups/{test_group.id}/subgroups", info=params)
self.assert_json_success(result)
self.assert_subgroup_membership(test_group, [support_group])
params = {"add": orjson.dumps([test_group.id]).decode()}
result = self.client_post(f"/json/user_groups/{leadership_group.id}/subgroups", info=params)
self.assert_json_error(
result,
f"User group {leadership_group.id} is already a subgroup of one of the passed subgroups.",
)
self.assert_subgroup_membership(test_group, [support_group])
lear_realm = get_realm("lear")
lear_cordelia = self.lear_user("cordelia")
lear_test_group = check_add_user_group(
lear_realm, "test", [lear_cordelia], acting_user=lear_cordelia
)
result = self.client_post(f"/json/user_groups/{lear_test_group.id}/subgroups", info=params)
self.assert_json_error(result, "Invalid user group")
self.assert_subgroup_membership(lear_test_group, [])
# Invalid subgroup id will raise an error.
params = {"add": orjson.dumps([leadership_group.id, 1111]).decode()}
result = self.client_post(f"/json/user_groups/{support_group.id}/subgroups", info=params)
self.assert_json_error(result, "Invalid user group ID: 1111")
self.assert_subgroup_membership(support_group, [leadership_group])
# Test when nothing is provided
result = self.client_post(f"/json/user_groups/{support_group.id}/subgroups", info={})
self.assert_json_error(result, 'Nothing to do. Specify at least one of "add" or "delete".')
self.assert_subgroup_membership(support_group, [leadership_group])
# Do not have support group as subgroup of any group to follow
# the condition a group used as a subgroup cannot be deactivated.
params = {"delete": orjson.dumps([support_group.id]).decode()}
result = self.client_post(f"/json/user_groups/{test_group.id}/subgroups", info=params)
self.assert_json_success(result)
self.assert_subgroup_membership(test_group, [])
# Test adding or removing subgroups from a deactivated group.
do_deactivate_user_group(support_group, acting_user=None)
params = {"delete": orjson.dumps([leadership_group.id]).decode()}
result = self.client_post(f"/json/user_groups/{support_group.id}/subgroups", info=params)
self.assert_json_error(result, "User group is deactivated.")
self.assert_subgroup_membership(support_group, [leadership_group])
params = {"add": orjson.dumps([test_group.id]).decode()}
result = self.client_post(f"/json/user_groups/{support_group.id}/subgroups", info=params)
self.assert_json_error(result, "User group is deactivated.")
self.assert_subgroup_membership(support_group, [leadership_group])
# Test that a deactivated group cannot be used as a subgroup.
params = {"add": orjson.dumps([support_group.id]).decode()}
result = self.client_post(f"/json/user_groups/{test_group.id}/subgroups", info=params)
self.assert_json_error(result, "User group is deactivated.")
def test_permission_to_add_subgroups_to_group(self) -> None:
realm = get_realm("zulip")
hamlet = self.example_user("hamlet")
othello = self.example_user("othello")
leadership_group = check_add_user_group(realm, "leadership", [othello], acting_user=othello)
support_group = check_add_user_group(realm, "support", [hamlet], acting_user=hamlet)
def check_adding_subgroups_to_group(acting_user: str, error_msg: str | None = None) -> None:
params = {"add": orjson.dumps([leadership_group.id]).decode()}
self.assert_subgroup_membership(support_group, [])
result = self.api_post(
self.example_user(acting_user),
f"/api/v1/user_groups/{support_group.id}/subgroups",
info=params,
)
if error_msg is None:
self.assert_json_success(result)
self.assert_subgroup_membership(support_group, [leadership_group])
# Remove the subgroup to test further cases.
remove_subgroups_from_user_group(
support_group, [leadership_group], acting_user=None
)
else:
self.assert_json_error(result, error_msg)
nobody_group = NamedUserGroup.objects.get(
name=SystemGroups.NOBODY, realm=realm, is_system_group=True
)
# Set manage permissions to "Nobody" group to test permission
# with can_add_members_group.
do_change_user_group_permission_setting(
support_group,
"can_manage_group",
nobody_group,
acting_user=None,
)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
nobody_group,
acting_user=None,
)
do_change_user_group_permission_setting(
support_group,
"can_add_members_group",
nobody_group,
acting_user=None,
)
check_adding_subgroups_to_group("desdemona", "Insufficient permission")
owners_group = NamedUserGroup.objects.get(
name=SystemGroups.OWNERS, realm=realm, is_system_group=True
)
do_change_user_group_permission_setting(
support_group,
"can_add_members_group",
owners_group,
acting_user=None,
)
check_adding_subgroups_to_group("iago", "Insufficient permission")
check_adding_subgroups_to_group("desdemona")
# Test case when setting is set to a non-system group.
prospero = self.example_user("prospero")
test_group = check_add_user_group(realm, "test", [prospero], acting_user=prospero)
do_change_user_group_permission_setting(
support_group,
"can_add_members_group",
test_group,
acting_user=None,
)
check_adding_subgroups_to_group("desdemona", "Insufficient permission")
check_adding_subgroups_to_group("prospero")
# Test case when setting is set to an anonymous group.
setting_group = self.create_or_update_anonymous_group_for_setting(
direct_members=[othello],
direct_subgroups=[owners_group],
)
do_change_user_group_permission_setting(
support_group,
"can_add_members_group",
setting_group,
acting_user=None,
)
check_adding_subgroups_to_group("prospero", "Insufficient permission")
check_adding_subgroups_to_group("iago", "Insufficient permission")
check_adding_subgroups_to_group("desdemona")
check_adding_subgroups_to_group("othello")
# Set can_add_members_group setting to nobody, so we can test
# managing permissions as well.
do_change_user_group_permission_setting(
support_group,
"can_add_members_group",
nobody_group,
acting_user=None,
)
# Check permission as per can_manage_group setting.
setting_group = self.create_or_update_anonymous_group_for_setting(
direct_members=[othello],
direct_subgroups=[owners_group],
)
do_change_user_group_permission_setting(
support_group,
"can_manage_group",
setting_group,
acting_user=None,
)
check_adding_subgroups_to_group("iago", "Insufficient permission")
check_adding_subgroups_to_group("desdemona")
check_adding_subgroups_to_group("othello")
# Check permission as per can_manage_all_groups setting.
do_change_user_group_permission_setting(
support_group,
"can_manage_group",
nobody_group,
acting_user=None,
)
setting_group = self.create_or_update_anonymous_group_for_setting(
direct_members=[othello],
direct_subgroups=[owners_group],
)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
setting_group,
acting_user=None,
)
check_adding_subgroups_to_group("iago", "Insufficient permission")
check_adding_subgroups_to_group("desdemona")
check_adding_subgroups_to_group("othello")
def test_permission_to_remove_subgroups_from_group(self) -> None:
realm = get_realm("zulip")
hamlet = self.example_user("hamlet")
othello = self.example_user("othello")
leadership_group = check_add_user_group(realm, "leadership", [othello], acting_user=othello)
support_group = check_add_user_group(realm, "support", [hamlet], acting_user=hamlet)
add_subgroups_to_user_group(support_group, [leadership_group], acting_user=None)
def check_remove_subgroups_from_group(
acting_user: str, error_msg: str | None = None
) -> None:
params = {"delete": orjson.dumps([leadership_group.id]).decode()}
self.assert_subgroup_membership(support_group, [leadership_group])
result = self.api_post(
self.example_user(acting_user),
f"/api/v1/user_groups/{support_group.id}/subgroups",
info=params,
)
if error_msg is None:
self.assert_json_success(result)
self.assert_subgroup_membership(support_group, [])
# Add the subgroup again to test further cases.
add_subgroups_to_user_group(support_group, [leadership_group], acting_user=None)
else:
self.assert_json_error(result, error_msg)
# Set permissions for managing all groups to "Nobody" group to
# test permission with can_manage_group.
nobody_group = NamedUserGroup.objects.get(
name=SystemGroups.NOBODY, realm=realm, is_system_group=True
)
do_change_realm_permission_group_setting(
realm, "can_manage_all_groups", nobody_group, acting_user=None
)
owners_group = NamedUserGroup.objects.get(
name=SystemGroups.OWNERS, realm=realm, is_system_group=True
)
do_change_user_group_permission_setting(
support_group,
"can_manage_group",
owners_group,
acting_user=None,
)
check_remove_subgroups_from_group("iago", "Insufficient permission")
check_remove_subgroups_from_group("desdemona")
# Test case when setting is set to a non-system group.
prospero = self.example_user("prospero")
test_group = check_add_user_group(realm, "test", [prospero], acting_user=prospero)
do_change_user_group_permission_setting(
support_group,
"can_manage_group",
test_group,
acting_user=None,
)
check_remove_subgroups_from_group("desdemona", "Insufficient permission")
check_remove_subgroups_from_group("prospero")
# Test case when setting is set to an anonymous group.
setting_group = self.create_or_update_anonymous_group_for_setting(
direct_members=[othello],
direct_subgroups=[owners_group],
)
do_change_user_group_permission_setting(
support_group,
"can_manage_group",
setting_group,
acting_user=None,
)
check_remove_subgroups_from_group("prospero", "Insufficient permission")
check_remove_subgroups_from_group("iago", "Insufficient permission")
check_remove_subgroups_from_group("desdemona")
check_remove_subgroups_from_group("othello")
# Set can_manage_group setting to nobody, so we can test
# can_manage_all_groups behavior.
do_change_user_group_permission_setting(
support_group,
"can_manage_group",
nobody_group,
acting_user=None,
)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
owners_group,
acting_user=None,
)
check_remove_subgroups_from_group("desdemona")
check_remove_subgroups_from_group("iago", "Insufficient permission")
# Test case when setting is set to a non-system group.
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
test_group,
acting_user=None,
)
check_remove_subgroups_from_group("desdemona", "Insufficient permission")
check_remove_subgroups_from_group("prospero")
# Test case when setting is set to an anonymous group.
setting_group = self.create_or_update_anonymous_group_for_setting(
direct_members=[othello],
direct_subgroups=[owners_group],
)
do_change_realm_permission_group_setting(
realm,
"can_manage_all_groups",
setting_group,
acting_user=None,
)
check_remove_subgroups_from_group("prospero", "Insufficient permission")
check_remove_subgroups_from_group("iago", "Insufficient permission")
check_remove_subgroups_from_group("desdemona")
check_remove_subgroups_from_group("othello")
def test_get_is_user_group_member_status(self) -> None:
self.login("iago")
realm = get_realm("zulip")
desdemona = self.example_user("desdemona")
iago = self.example_user("iago")
othello = self.example_user("othello")
2024-04-18 12:23:46 +02:00
admins_group = NamedUserGroup.objects.get(
realm=realm, name=SystemGroups.ADMINISTRATORS, is_system_group=True
)
# Invalid user ID.
result = self.client_get(f"/json/user_groups/{admins_group.id}/members/1111")
self.assert_json_error(result, "No such user")
# Invalid user group ID.
result = self.client_get(f"/json/user_groups/1111/members/{iago.id}")
self.assert_json_error(result, "Invalid user group")
lear_realm = get_realm("lear")
lear_cordelia = self.lear_user("cordelia")
lear_test_group = check_add_user_group(
lear_realm, "test", [lear_cordelia], acting_user=lear_cordelia
)
result = self.client_get(
f"/json/user_groups/{lear_test_group.id}/members/{lear_cordelia.id}"
)
self.assert_json_error(result, "Invalid user group")
result_dict = orjson.loads(
self.client_get(f"/json/user_groups/{admins_group.id}/members/{othello.id}").content
)
self.assertFalse(result_dict["is_user_group_member"])
result_dict = orjson.loads(
self.client_get(f"/json/user_groups/{admins_group.id}/members/{iago.id}").content
)
self.assertTrue(result_dict["is_user_group_member"])
# Checking membership of not a direct member but member of a subgroup.
result_dict = orjson.loads(
self.client_get(f"/json/user_groups/{admins_group.id}/members/{desdemona.id}").content
)
self.assertTrue(result_dict["is_user_group_member"])
# Checking membership of not a direct member but member of a subgroup when passing
# recursive parameter as False.
params = {"direct_member_only": orjson.dumps(True).decode()}
result_dict = orjson.loads(
self.client_get(
f"/json/user_groups/{admins_group.id}/members/{desdemona.id}", info=params
).content
)
self.assertFalse(result_dict["is_user_group_member"])
# Logging in with a user not part of the group.
self.login("hamlet")
result_dict = orjson.loads(
self.client_get(f"/json/user_groups/{admins_group.id}/members/{iago.id}").content
)
self.assertTrue(result_dict["is_user_group_member"])
result_dict = orjson.loads(
self.client_get(f"/json/user_groups/{admins_group.id}/members/{othello.id}").content
)
self.assertFalse(result_dict["is_user_group_member"])
# Check membership of deactivated user.
do_deactivate_user(iago, acting_user=None)
result = self.client_get(f"/json/user_groups/{admins_group.id}/members/{iago.id}")
self.assert_json_error(result, "User is deactivated")
def test_get_user_group_members(self) -> None:
realm = get_realm("zulip")
iago = self.example_user("iago")
desdemona = self.example_user("desdemona")
shiva = self.example_user("shiva")
2024-04-18 12:23:46 +02:00
moderators_group = NamedUserGroup.objects.get(
name=SystemGroups.MODERATORS, realm=realm, is_system_group=True
)
self.login("iago")
# Test invalid user group id
result = self.client_get("/json/user_groups/1111/members")
self.assert_json_error(result, "Invalid user group")
lear_realm = get_realm("lear")
lear_cordelia = self.lear_user("cordelia")
lear_test_group = check_add_user_group(
lear_realm, "test", [lear_cordelia], acting_user=lear_cordelia
)
result = self.client_get(f"/json/user_groups/{lear_test_group.id}/members")
self.assert_json_error(result, "Invalid user group")
result_dict = orjson.loads(
self.client_get(f"/json/user_groups/{moderators_group.id}/members").content
)
self.assertCountEqual(result_dict["members"], [desdemona.id, iago.id, shiva.id])
params = {"direct_member_only": orjson.dumps(True).decode()}
result_dict = orjson.loads(
self.client_get(f"/json/user_groups/{moderators_group.id}/members", info=params).content
)
self.assertCountEqual(result_dict["members"], [shiva.id])
# User not part of a group can also get its members.
self.login("hamlet")
result_dict = orjson.loads(
self.client_get(f"/json/user_groups/{moderators_group.id}/members").content
)
self.assertCountEqual(result_dict["members"], [desdemona.id, iago.id, shiva.id])
params = {"direct_member_only": orjson.dumps(True).decode()}
result_dict = orjson.loads(
self.client_get(f"/json/user_groups/{moderators_group.id}/members", info=params).content
)
self.assertCountEqual(result_dict["members"], [shiva.id])
# Check deactivated users are not returned in members list.
do_deactivate_user(shiva, acting_user=None)
result_dict = orjson.loads(
self.client_get(f"/json/user_groups/{moderators_group.id}/members", info=params).content
)
self.assertCountEqual(result_dict["members"], [])
params = {"direct_member_only": orjson.dumps(False).decode()}
result_dict = orjson.loads(
self.client_get(f"/json/user_groups/{moderators_group.id}/members", info=params).content
)
self.assertCountEqual(result_dict["members"], [desdemona.id, iago.id])
def test_get_subgroups_of_user_group(self) -> None:
realm = get_realm("zulip")
2024-04-18 12:23:46 +02:00
owners_group = NamedUserGroup.objects.get(
name=SystemGroups.OWNERS, realm=realm, is_system_group=True
)
2024-04-18 12:23:46 +02:00
admins_group = NamedUserGroup.objects.get(
name=SystemGroups.ADMINISTRATORS, realm=realm, is_system_group=True
)
2024-04-18 12:23:46 +02:00
moderators_group = NamedUserGroup.objects.get(
name=SystemGroups.MODERATORS, realm=realm, is_system_group=True
)
self.login("iago")
# Test invalid user group id
result = self.client_get("/json/user_groups/1111/subgroups")
self.assert_json_error(result, "Invalid user group")
lear_realm = get_realm("lear")
lear_cordelia = self.lear_user("cordelia")
lear_test_group = check_add_user_group(
lear_realm, "test", [lear_cordelia], acting_user=lear_cordelia
)
result = self.client_get(f"/json/user_groups/{lear_test_group.id}/subgroups")
self.assert_json_error(result, "Invalid user group")
result_dict = orjson.loads(
self.client_get(f"/json/user_groups/{moderators_group.id}/subgroups").content
)
self.assertEqual(set(result_dict["subgroups"]), {admins_group.id, owners_group.id})
params = {"direct_subgroup_only": orjson.dumps(True).decode()}
result_dict = orjson.loads(
self.client_get(
f"/json/user_groups/{moderators_group.id}/subgroups", info=params
).content
)
self.assertCountEqual(result_dict["subgroups"], [admins_group.id])
# User not part of a group can also get its subgroups.
self.login("hamlet")
result_dict = orjson.loads(
self.client_get(f"/json/user_groups/{moderators_group.id}/subgroups").content
)
self.assertEqual(set(result_dict["subgroups"]), {admins_group.id, owners_group.id})
params = {"direct_subgroup_only": orjson.dumps(True).decode()}
result_dict = orjson.loads(
self.client_get(
f"/json/user_groups/{moderators_group.id}/subgroups", info=params
).content
)
self.assertCountEqual(result_dict["subgroups"], [admins_group.id])
user_groups: Make locks required for updating user group memberships. **Background** User groups are expected to comply with the DAG constraint for the many-to-many inter-group membership. The check for this constraint has to be performed recursively so that we can find all direct and indirect subgroups of the user group to be added. This kind of check is vulnerable to phantom reads which is possible at the default read committed isolation level because we cannot guarantee that the check is still valid when we are adding the subgroups to the user group. **Solution** To avoid having another transaction concurrently update one of the to-be-subgroup after the recursive check is done, and before the subgroup is added, we use SELECT FOR UPDATE to lock the user group rows. The lock needs to be acquired before a group membership change is about to occur before any check has been conducted. Suppose that we are adding subgroup B to supergroup A, the locking protocol is specified as follows: 1. Acquire a lock for B and all its direct and indirect subgroups. 2. Acquire a lock for A. For the removal of user groups, we acquire a lock for the user group to be removed with all its direct and indirect subgroups. This is the special case A=B, which is still complaint with the protocol. **Error handling** We currently rely on Postgres' deadlock detection to abort transactions and show an error for the users. In the future, we might need some recovery mechanism or at least better error handling. **Notes** An important note is that we need to reuse the recursive CTE query that finds the direct and indirect subgroups when applying the lock on the rows. And the lock needs to be acquired the same way for the addition and removal of direct subgroups. User membership change (as opposed to user group membership) is not affected. Read-only queries aren't either. The locks only protect critical regions where the user group dependency graph might violate the DAG constraint, where users are not participating. **Testing** We implement a transaction test case targeting some typical scenarios when an internal server error is expected to happen (this means that the user group view makes the correct decision to abort the transaction when something goes wrong with locks). To achieve this, we add a development view intended only for unit tests. It has a global BARRIER that can be shared across threads, so that we can synchronize them to consistently reproduce certain potential race conditions prevented by the database locks. The transaction test case lanuches pairs of threads initiating possibly conflicting requests at the same time. The tests are set up such that exactly N of them are expected to succeed with a certain error message (while we don't know each one). **Security notes** get_recursive_subgroups_for_groups will no longer fetch user groups from other realms. As a result, trying to add/remove a subgroup from another realm results in a UserGroup not found error response. We also implement subgroup-specific checks in has_user_group_access to keep permission managing in a single place. Do note that the API currently don't have a way to violate that check because we are only checking the realm ID now.
2023-06-17 04:39:52 +02:00
def test_add_subgroup_from_wrong_realm(self) -> None:
iago = self.example_user("iago")
other_realm = do_create_realm("other", "Other Realm")
other_user_group = check_add_user_group(other_realm, "user_group", [], acting_user=iago)
user_groups: Make locks required for updating user group memberships. **Background** User groups are expected to comply with the DAG constraint for the many-to-many inter-group membership. The check for this constraint has to be performed recursively so that we can find all direct and indirect subgroups of the user group to be added. This kind of check is vulnerable to phantom reads which is possible at the default read committed isolation level because we cannot guarantee that the check is still valid when we are adding the subgroups to the user group. **Solution** To avoid having another transaction concurrently update one of the to-be-subgroup after the recursive check is done, and before the subgroup is added, we use SELECT FOR UPDATE to lock the user group rows. The lock needs to be acquired before a group membership change is about to occur before any check has been conducted. Suppose that we are adding subgroup B to supergroup A, the locking protocol is specified as follows: 1. Acquire a lock for B and all its direct and indirect subgroups. 2. Acquire a lock for A. For the removal of user groups, we acquire a lock for the user group to be removed with all its direct and indirect subgroups. This is the special case A=B, which is still complaint with the protocol. **Error handling** We currently rely on Postgres' deadlock detection to abort transactions and show an error for the users. In the future, we might need some recovery mechanism or at least better error handling. **Notes** An important note is that we need to reuse the recursive CTE query that finds the direct and indirect subgroups when applying the lock on the rows. And the lock needs to be acquired the same way for the addition and removal of direct subgroups. User membership change (as opposed to user group membership) is not affected. Read-only queries aren't either. The locks only protect critical regions where the user group dependency graph might violate the DAG constraint, where users are not participating. **Testing** We implement a transaction test case targeting some typical scenarios when an internal server error is expected to happen (this means that the user group view makes the correct decision to abort the transaction when something goes wrong with locks). To achieve this, we add a development view intended only for unit tests. It has a global BARRIER that can be shared across threads, so that we can synchronize them to consistently reproduce certain potential race conditions prevented by the database locks. The transaction test case lanuches pairs of threads initiating possibly conflicting requests at the same time. The tests are set up such that exactly N of them are expected to succeed with a certain error message (while we don't know each one). **Security notes** get_recursive_subgroups_for_groups will no longer fetch user groups from other realms. As a result, trying to add/remove a subgroup from another realm results in a UserGroup not found error response. We also implement subgroup-specific checks in has_user_group_access to keep permission managing in a single place. Do note that the API currently don't have a way to violate that check because we are only checking the realm ID now.
2023-06-17 04:39:52 +02:00
realm = get_realm("zulip")
zulip_group = check_add_user_group(realm, "zulip_test", [], acting_user=iago)
user_groups: Make locks required for updating user group memberships. **Background** User groups are expected to comply with the DAG constraint for the many-to-many inter-group membership. The check for this constraint has to be performed recursively so that we can find all direct and indirect subgroups of the user group to be added. This kind of check is vulnerable to phantom reads which is possible at the default read committed isolation level because we cannot guarantee that the check is still valid when we are adding the subgroups to the user group. **Solution** To avoid having another transaction concurrently update one of the to-be-subgroup after the recursive check is done, and before the subgroup is added, we use SELECT FOR UPDATE to lock the user group rows. The lock needs to be acquired before a group membership change is about to occur before any check has been conducted. Suppose that we are adding subgroup B to supergroup A, the locking protocol is specified as follows: 1. Acquire a lock for B and all its direct and indirect subgroups. 2. Acquire a lock for A. For the removal of user groups, we acquire a lock for the user group to be removed with all its direct and indirect subgroups. This is the special case A=B, which is still complaint with the protocol. **Error handling** We currently rely on Postgres' deadlock detection to abort transactions and show an error for the users. In the future, we might need some recovery mechanism or at least better error handling. **Notes** An important note is that we need to reuse the recursive CTE query that finds the direct and indirect subgroups when applying the lock on the rows. And the lock needs to be acquired the same way for the addition and removal of direct subgroups. User membership change (as opposed to user group membership) is not affected. Read-only queries aren't either. The locks only protect critical regions where the user group dependency graph might violate the DAG constraint, where users are not participating. **Testing** We implement a transaction test case targeting some typical scenarios when an internal server error is expected to happen (this means that the user group view makes the correct decision to abort the transaction when something goes wrong with locks). To achieve this, we add a development view intended only for unit tests. It has a global BARRIER that can be shared across threads, so that we can synchronize them to consistently reproduce certain potential race conditions prevented by the database locks. The transaction test case lanuches pairs of threads initiating possibly conflicting requests at the same time. The tests are set up such that exactly N of them are expected to succeed with a certain error message (while we don't know each one). **Security notes** get_recursive_subgroups_for_groups will no longer fetch user groups from other realms. As a result, trying to add/remove a subgroup from another realm results in a UserGroup not found error response. We also implement subgroup-specific checks in has_user_group_access to keep permission managing in a single place. Do note that the API currently don't have a way to violate that check because we are only checking the realm ID now.
2023-06-17 04:39:52 +02:00
self.login("iago")
result = self.client_post(
f"/json/user_groups/{zulip_group.id}/subgroups",
{"add": orjson.dumps([other_user_group.id]).decode()},
)
self.assert_json_error(result, f"Invalid user group ID: {other_user_group.id}")
# Having a subgroup from another realm is very unlikely because we do
# not allow cross-realm subgroups being added in the first place. But we
# test the handling in this scenario for completeness.
add_subgroups_to_user_group(zulip_group, [other_user_group], acting_user=None)
result = self.client_post(
f"/json/user_groups/{zulip_group.id}/subgroups",
{"delete": orjson.dumps([other_user_group.id]).decode()},
)
self.assert_json_error(result, f"Invalid user group ID: {other_user_group.id}")